mirror of
				https://github.com/minio/minio.git
				synced 2025-10-30 00:05:02 -04:00 
			
		
		
		
	This PR brings an additional logger implementation called AuditLog which logs to http targets The intention is to use AuditLog to log all incoming requests, this is used as a mechanism by external log collection entities for processing Minio requests.
		
			
				
	
	
		
			702 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			702 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|  * Minio Cloud Storage, (C) 2018 Minio, Inc.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| 
 | |
| package cmd
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"encoding/xml"
 | |
| 	"fmt"
 | |
| 	"net/url"
 | |
| 	"path"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/minio/minio/cmd/logger"
 | |
| 	"github.com/minio/minio/pkg/event"
 | |
| 	xnet "github.com/minio/minio/pkg/net"
 | |
| 	"github.com/minio/minio/pkg/policy"
 | |
| )
 | |
| 
 | |
| // NotificationSys - notification system.
 | |
| type NotificationSys struct {
 | |
| 	sync.RWMutex
 | |
| 	targetList                 *event.TargetList
 | |
| 	bucketRulesMap             map[string]event.RulesMap
 | |
| 	bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
 | |
| 	peerRPCClientMap           map[xnet.Host]*PeerRPCClient
 | |
| }
 | |
| 
 | |
| // GetARNList - returns available ARNs.
 | |
| func (sys *NotificationSys) GetARNList() []string {
 | |
| 	arns := []string{}
 | |
| 	region := globalServerConfig.GetRegion()
 | |
| 	for _, targetID := range sys.targetList.List() {
 | |
| 		// httpclient target is part of ListenBucketNotification
 | |
| 		// which doesn't need to be listed as part of the ARN list
 | |
| 		// This list is only meant for external targets, filter
 | |
| 		// this out pro-actively.
 | |
| 		if !strings.HasPrefix(targetID.ID, "httpclient+") {
 | |
| 			arns = append(arns, targetID.ToARN(region).String())
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return arns
 | |
| }
 | |
| 
 | |
| // GetPeerRPCClient - returns PeerRPCClient of addr.
 | |
| func (sys *NotificationSys) GetPeerRPCClient(addr xnet.Host) *PeerRPCClient {
 | |
| 	return sys.peerRPCClientMap[addr]
 | |
| }
 | |
| 
 | |
| // NotificationPeerErr returns error associated for a remote peer.
 | |
| type NotificationPeerErr struct {
 | |
| 	Host xnet.Host // Remote host on which the rpc call was initiated
 | |
| 	Err  error     // Error returned by the remote peer for an rpc call
 | |
| }
 | |
| 
 | |
| // DeleteBucket - calls DeleteBucket RPC call on all peers.
 | |
| func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) {
 | |
| 	go func() {
 | |
| 		var wg sync.WaitGroup
 | |
| 		for addr, client := range sys.peerRPCClientMap {
 | |
| 			wg.Add(1)
 | |
| 			go func(addr xnet.Host, client *PeerRPCClient) {
 | |
| 				defer wg.Done()
 | |
| 				if err := client.DeleteBucket(bucketName); err != nil {
 | |
| 					logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
 | |
| 					logger.LogIf(ctx, err)
 | |
| 				}
 | |
| 			}(addr, client)
 | |
| 		}
 | |
| 		wg.Wait()
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // LoadCredentials - calls LoadCredentials RPC call on all peers.
 | |
| func (sys *NotificationSys) LoadCredentials() map[xnet.Host]error {
 | |
| 	errors := make(map[xnet.Host]error)
 | |
| 	var wg sync.WaitGroup
 | |
| 	for addr, client := range sys.peerRPCClientMap {
 | |
| 		wg.Add(1)
 | |
| 		go func(addr xnet.Host, client *PeerRPCClient) {
 | |
| 			defer wg.Done()
 | |
| 			// Try to set credentials in three attempts.
 | |
| 			for i := 0; i < 3; i++ {
 | |
| 				err := client.LoadCredentials()
 | |
| 				if err == nil {
 | |
| 					break
 | |
| 				}
 | |
| 				errors[addr] = err
 | |
| 				// Wait for one second and no need wait after last attempt.
 | |
| 				if i < 2 {
 | |
| 					time.Sleep(1 * time.Second)
 | |
| 				}
 | |
| 			}
 | |
| 		}(addr, client)
 | |
| 	}
 | |
| 	wg.Wait()
 | |
| 
 | |
| 	return errors
 | |
| }
 | |
| 
 | |
| // SetBucketPolicy - calls SetBucketPolicy RPC call on all peers.
 | |
| func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName string, bucketPolicy *policy.Policy) {
 | |
| 	go func() {
 | |
| 		var wg sync.WaitGroup
 | |
| 		for addr, client := range sys.peerRPCClientMap {
 | |
| 			wg.Add(1)
 | |
| 			go func(addr xnet.Host, client *PeerRPCClient) {
 | |
| 				defer wg.Done()
 | |
| 				if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil {
 | |
| 					logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
 | |
| 					logger.LogIf(ctx, err)
 | |
| 				}
 | |
| 			}(addr, client)
 | |
| 		}
 | |
| 		wg.Wait()
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers.
 | |
| func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName string) {
 | |
| 	go func() {
 | |
| 		var wg sync.WaitGroup
 | |
| 		for addr, client := range sys.peerRPCClientMap {
 | |
| 			wg.Add(1)
 | |
| 			go func(addr xnet.Host, client *PeerRPCClient) {
 | |
| 				defer wg.Done()
 | |
| 				if err := client.RemoveBucketPolicy(bucketName); err != nil {
 | |
| 					logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
 | |
| 					logger.LogIf(ctx, err)
 | |
| 				}
 | |
| 			}(addr, client)
 | |
| 		}
 | |
| 		wg.Wait()
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // PutBucketNotification - calls PutBucketNotification RPC call on all peers.
 | |
| func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketName string, rulesMap event.RulesMap) {
 | |
| 	go func() {
 | |
| 		var wg sync.WaitGroup
 | |
| 		for addr, client := range sys.peerRPCClientMap {
 | |
| 			wg.Add(1)
 | |
| 			go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) {
 | |
| 				defer wg.Done()
 | |
| 				if err := client.PutBucketNotification(bucketName, rulesMap); err != nil {
 | |
| 					logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
 | |
| 					logger.LogIf(ctx, err)
 | |
| 				}
 | |
| 			}(addr, client, rulesMap.Clone())
 | |
| 		}
 | |
| 		wg.Wait()
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // ListenBucketNotification - calls ListenBucketNotification RPC call on all peers.
 | |
| func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string, eventNames []event.Name, pattern string,
 | |
| 	targetID event.TargetID, localPeer xnet.Host) {
 | |
| 	go func() {
 | |
| 		var wg sync.WaitGroup
 | |
| 		for addr, client := range sys.peerRPCClientMap {
 | |
| 			wg.Add(1)
 | |
| 			go func(addr xnet.Host, client *PeerRPCClient) {
 | |
| 				defer wg.Done()
 | |
| 				if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil {
 | |
| 					logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
 | |
| 					logger.LogIf(ctx, err)
 | |
| 				}
 | |
| 			}(addr, client)
 | |
| 		}
 | |
| 		wg.Wait()
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name.
 | |
| func (sys *NotificationSys) AddRemoteTarget(bucketName string, target event.Target, rulesMap event.RulesMap) error {
 | |
| 	if err := sys.targetList.Add(target); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	sys.Lock()
 | |
| 	targetMap := sys.bucketRemoteTargetRulesMap[bucketName]
 | |
| 	if targetMap == nil {
 | |
| 		targetMap = make(map[event.TargetID]event.RulesMap)
 | |
| 	}
 | |
| 
 | |
| 	rulesMap = rulesMap.Clone()
 | |
| 	targetMap[target.ID()] = rulesMap
 | |
| 	sys.bucketRemoteTargetRulesMap[bucketName] = targetMap
 | |
| 
 | |
| 	rulesMap = rulesMap.Clone()
 | |
| 	rulesMap.Add(sys.bucketRulesMap[bucketName])
 | |
| 	sys.bucketRulesMap[bucketName] = rulesMap
 | |
| 
 | |
| 	sys.Unlock()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // RemoteTargetExist - checks whether given target ID is a HTTP/PeerRPC client target or not.
 | |
| func (sys *NotificationSys) RemoteTargetExist(bucketName string, targetID event.TargetID) bool {
 | |
| 	sys.Lock()
 | |
| 	defer sys.Unlock()
 | |
| 
 | |
| 	targetMap, ok := sys.bucketRemoteTargetRulesMap[bucketName]
 | |
| 	if ok {
 | |
| 		_, ok = targetMap[targetID]
 | |
| 	}
 | |
| 
 | |
| 	return ok
 | |
| }
 | |
| 
 | |
| // initListeners - initializes PeerRPC clients available in listener.json.
 | |
| func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLayer, bucketName string) error {
 | |
| 	// listener.json is available/applicable only in DistXL mode.
 | |
| 	if !globalIsDistXL {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Construct path to listener.json for the given bucket.
 | |
| 	configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig)
 | |
| 	transactionConfigFile := configFile + ".transaction"
 | |
| 
 | |
| 	// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
 | |
| 	// and configFile, take a transaction lock to avoid data race between readConfig()
 | |
| 	// and saveConfig().
 | |
| 	objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
 | |
| 	if err := objLock.GetRLock(globalOperationTimeout); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer objLock.RUnlock()
 | |
| 
 | |
| 	configData, e := readConfig(ctx, objAPI, configFile)
 | |
| 	if e != nil && !IsErrIgnored(e, errDiskNotFound, errConfigNotFound) {
 | |
| 		return e
 | |
| 	}
 | |
| 
 | |
| 	listenerList := []ListenBucketNotificationArgs{}
 | |
| 	if configData != nil {
 | |
| 		if err := json.Unmarshal(configData, &listenerList); err != nil {
 | |
| 			logger.LogIf(ctx, err)
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if len(listenerList) == 0 {
 | |
| 		// Nothing to initialize for empty listener list.
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	for _, args := range listenerList {
 | |
| 		found, err := isLocalHost(args.Addr.Name)
 | |
| 		if err != nil {
 | |
| 			logger.GetReqInfo(ctx).AppendTags("host", args.Addr.Name)
 | |
| 			logger.LogIf(ctx, err)
 | |
| 			return err
 | |
| 		}
 | |
| 		if found {
 | |
| 			// As this function is called at startup, skip HTTP listener to this host.
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		rpcClient := sys.GetPeerRPCClient(args.Addr)
 | |
| 		if rpcClient == nil {
 | |
| 			return fmt.Errorf("unable to find PeerRPCClient by address %v in listener.json for bucket %v", args.Addr, bucketName)
 | |
| 		}
 | |
| 
 | |
| 		exist, err := rpcClient.RemoteTargetExist(bucketName, args.TargetID)
 | |
| 		if err != nil {
 | |
| 			logger.GetReqInfo(ctx).AppendTags("targetID", args.TargetID.Name)
 | |
| 			logger.LogIf(ctx, err)
 | |
| 			return err
 | |
| 		}
 | |
| 		if !exist {
 | |
| 			// Skip previously connected HTTP listener which is not found in remote peer.
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		target := NewPeerRPCClientTarget(bucketName, args.TargetID, rpcClient)
 | |
| 		rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
 | |
| 		if err = sys.AddRemoteTarget(bucketName, target, rulesMap); err != nil {
 | |
| 			logger.GetReqInfo(ctx).AppendTags("targetName", target.id.Name)
 | |
| 			logger.LogIf(ctx, err)
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sys *NotificationSys) refresh(objAPI ObjectLayer) error {
 | |
| 	buckets, err := objAPI.ListBuckets(context.Background())
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	for _, bucket := range buckets {
 | |
| 		ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name})
 | |
| 		config, err := readNotificationConfig(ctx, objAPI, bucket.Name)
 | |
| 		if err != nil && err != errNoSuchNotifications {
 | |
| 			return err
 | |
| 		}
 | |
| 		if err == errNoSuchNotifications {
 | |
| 			continue
 | |
| 		}
 | |
| 		sys.AddRulesMap(bucket.Name, config.ToRulesMap())
 | |
| 		if err = sys.initListeners(ctx, objAPI, bucket.Name); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Init - initializes notification system from notification.xml and listener.json of all buckets.
 | |
| func (sys *NotificationSys) Init(objAPI ObjectLayer) error {
 | |
| 	if objAPI == nil {
 | |
| 		return errInvalidArgument
 | |
| 	}
 | |
| 
 | |
| 	doneCh := make(chan struct{})
 | |
| 	defer close(doneCh)
 | |
| 
 | |
| 	// Initializing notification needs a retry mechanism for
 | |
| 	// the following reasons:
 | |
| 	//  - Read quorum is lost just after the initialization
 | |
| 	//    of the object layer.
 | |
| 	retryTimerCh := newRetryTimerSimple(doneCh)
 | |
| 	for {
 | |
| 		select {
 | |
| 		case _ = <-retryTimerCh:
 | |
| 			if err := sys.refresh(objAPI); err != nil {
 | |
| 				if err == errDiskNotFound ||
 | |
| 					strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
 | |
| 					strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
 | |
| 					logger.Info("Waiting for notification subsystem to be initialized..")
 | |
| 					continue
 | |
| 				}
 | |
| 				return err
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // AddRulesMap - adds rules map for bucket name.
 | |
| func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) {
 | |
| 	sys.Lock()
 | |
| 	defer sys.Unlock()
 | |
| 
 | |
| 	rulesMap = rulesMap.Clone()
 | |
| 
 | |
| 	for _, targetRulesMap := range sys.bucketRemoteTargetRulesMap[bucketName] {
 | |
| 		rulesMap.Add(targetRulesMap)
 | |
| 	}
 | |
| 
 | |
| 	// Do not add for an empty rulesMap.
 | |
| 	if len(rulesMap) == 0 {
 | |
| 		delete(sys.bucketRulesMap, bucketName)
 | |
| 	} else {
 | |
| 		sys.bucketRulesMap[bucketName] = rulesMap
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RemoveRulesMap - removes rules map for bucket name.
 | |
| func (sys *NotificationSys) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) {
 | |
| 	sys.Lock()
 | |
| 	defer sys.Unlock()
 | |
| 
 | |
| 	sys.bucketRulesMap[bucketName].Remove(rulesMap)
 | |
| 	if len(sys.bucketRulesMap[bucketName]) == 0 {
 | |
| 		delete(sys.bucketRulesMap, bucketName)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RemoveNotification - removes all notification configuration for bucket name.
 | |
| func (sys *NotificationSys) RemoveNotification(bucketName string) {
 | |
| 	sys.Lock()
 | |
| 	defer sys.Unlock()
 | |
| 
 | |
| 	delete(sys.bucketRulesMap, bucketName)
 | |
| 
 | |
| 	for targetID := range sys.bucketRemoteTargetRulesMap[bucketName] {
 | |
| 		sys.targetList.Remove(targetID)
 | |
| 		delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID)
 | |
| 	}
 | |
| 
 | |
| 	delete(sys.bucketRemoteTargetRulesMap, bucketName)
 | |
| }
 | |
| 
 | |
| // RemoveAllRemoteTargets - closes and removes all HTTP/PeerRPC client targets.
 | |
| func (sys *NotificationSys) RemoveAllRemoteTargets() {
 | |
| 	for _, targetMap := range sys.bucketRemoteTargetRulesMap {
 | |
| 		for targetID := range targetMap {
 | |
| 			sys.targetList.Remove(targetID)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RemoveRemoteTarget - closes and removes target by target ID.
 | |
| func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event.TargetID) {
 | |
| 	for terr := range sys.targetList.Remove(targetID) {
 | |
| 		reqInfo := (&logger.ReqInfo{}).AppendTags("targetID", terr.ID.Name)
 | |
| 		ctx := logger.SetReqInfo(context.Background(), reqInfo)
 | |
| 		logger.LogIf(ctx, terr.Err)
 | |
| 	}
 | |
| 
 | |
| 	sys.Lock()
 | |
| 	defer sys.Unlock()
 | |
| 
 | |
| 	if _, ok := sys.bucketRemoteTargetRulesMap[bucketName]; ok {
 | |
| 		delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID)
 | |
| 		if len(sys.bucketRemoteTargetRulesMap[bucketName]) == 0 {
 | |
| 			delete(sys.bucketRemoteTargetRulesMap, bucketName)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (sys *NotificationSys) send(bucketName string, eventData event.Event, targetIDs ...event.TargetID) (errs []event.TargetIDErr) {
 | |
| 	errCh := sys.targetList.Send(eventData, targetIDs...)
 | |
| 	for terr := range errCh {
 | |
| 		errs = append(errs, terr)
 | |
| 		if sys.RemoteTargetExist(bucketName, terr.ID) {
 | |
| 			sys.RemoveRemoteTarget(bucketName, terr.ID)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return errs
 | |
| }
 | |
| 
 | |
| // Send - sends event data to all matching targets.
 | |
| func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
 | |
| 	sys.RLock()
 | |
| 	targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
 | |
| 	sys.RUnlock()
 | |
| 
 | |
| 	if len(targetIDSet) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	targetIDs := targetIDSet.ToSlice()
 | |
| 	return sys.send(args.BucketName, args.ToEvent(), targetIDs...)
 | |
| }
 | |
| 
 | |
| // NewNotificationSys - creates new notification system object.
 | |
| func NewNotificationSys(config *serverConfig, endpoints EndpointList) *NotificationSys {
 | |
| 	targetList := getNotificationTargets(config)
 | |
| 	peerRPCClientMap := makeRemoteRPCClients(endpoints)
 | |
| 
 | |
| 	// bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init()
 | |
| 	return &NotificationSys{
 | |
| 		targetList:                 targetList,
 | |
| 		bucketRulesMap:             make(map[string]event.RulesMap),
 | |
| 		bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
 | |
| 		peerRPCClientMap:           peerRPCClientMap,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type eventArgs struct {
 | |
| 	EventName    event.Name
 | |
| 	BucketName   string
 | |
| 	Object       ObjectInfo
 | |
| 	ReqParams    map[string]string
 | |
| 	RespElements map[string]string
 | |
| 	Host         string
 | |
| 	Port         string
 | |
| 	UserAgent    string
 | |
| }
 | |
| 
 | |
| // ToEvent - converts to notification event.
 | |
| func (args eventArgs) ToEvent() event.Event {
 | |
| 	getOriginEndpoint := func() string {
 | |
| 		host := globalMinioHost
 | |
| 		if host == "" {
 | |
| 			// FIXME: Send FQDN or hostname of this machine than sending IP address.
 | |
| 			host = localIP4.ToSlice()[0]
 | |
| 		}
 | |
| 
 | |
| 		return fmt.Sprintf("%s://%s:%s", getURLScheme(globalIsSSL), host, globalMinioPort)
 | |
| 	}
 | |
| 
 | |
| 	creds := globalServerConfig.GetCredential()
 | |
| 	eventTime := UTCNow()
 | |
| 	uniqueID := fmt.Sprintf("%X", eventTime.UnixNano())
 | |
| 
 | |
| 	respElements := map[string]string{
 | |
| 		"x-amz-request-id":        uniqueID,
 | |
| 		"x-minio-origin-endpoint": getOriginEndpoint(), // Minio specific custom elements.
 | |
| 	}
 | |
| 	if args.RespElements["content-length"] != "" {
 | |
| 		respElements["content-length"] = args.RespElements["content-length"]
 | |
| 	}
 | |
| 	newEvent := event.Event{
 | |
| 		EventVersion:      "2.0",
 | |
| 		EventSource:       "minio:s3",
 | |
| 		AwsRegion:         globalServerConfig.GetRegion(),
 | |
| 		EventTime:         eventTime.Format(event.AMZTimeFormat),
 | |
| 		EventName:         args.EventName,
 | |
| 		UserIdentity:      event.Identity{creds.AccessKey},
 | |
| 		RequestParameters: args.ReqParams,
 | |
| 		ResponseElements:  respElements,
 | |
| 		S3: event.Metadata{
 | |
| 			SchemaVersion:   "1.0",
 | |
| 			ConfigurationID: "Config",
 | |
| 			Bucket: event.Bucket{
 | |
| 				Name:          args.BucketName,
 | |
| 				OwnerIdentity: event.Identity{creds.AccessKey},
 | |
| 				ARN:           policy.ResourceARNPrefix + args.BucketName,
 | |
| 			},
 | |
| 			Object: event.Object{
 | |
| 				Key:       url.QueryEscape(args.Object.Name),
 | |
| 				VersionID: "1",
 | |
| 				Sequencer: uniqueID,
 | |
| 			},
 | |
| 		},
 | |
| 		Source: event.Source{
 | |
| 			Host:      args.Host,
 | |
| 			Port:      args.Port,
 | |
| 			UserAgent: args.UserAgent,
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	if args.EventName != event.ObjectRemovedDelete {
 | |
| 		newEvent.S3.Object.ETag = args.Object.ETag
 | |
| 		newEvent.S3.Object.Size = args.Object.Size
 | |
| 		newEvent.S3.Object.ContentType = args.Object.ContentType
 | |
| 		newEvent.S3.Object.UserMetadata = args.Object.UserDefined
 | |
| 	}
 | |
| 
 | |
| 	return newEvent
 | |
| }
 | |
| 
 | |
| func sendEvent(args eventArgs) {
 | |
| 	// globalNotificationSys is not initialized in gateway mode.
 | |
| 	if globalNotificationSys == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	notifyCh := globalNotificationSys.Send(args)
 | |
| 	go func() {
 | |
| 		for _, err := range notifyCh {
 | |
| 			reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name}
 | |
| 			reqInfo.AppendTags("EventName", args.EventName.String())
 | |
| 			reqInfo.AppendTags("targetID", err.ID.Name)
 | |
| 			ctx := logger.SetReqInfo(context.Background(), reqInfo)
 | |
| 			logger.LogOnceIf(ctx, err.Err, err.ID)
 | |
| 		}
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| func readNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucketName string) (*event.Config, error) {
 | |
| 	// Construct path to notification.xml for the given bucket.
 | |
| 	configFile := path.Join(bucketConfigPrefix, bucketName, bucketNotificationConfig)
 | |
| 	configData, err := readConfig(ctx, objAPI, configFile)
 | |
| 	if err != nil {
 | |
| 		if err == errConfigNotFound {
 | |
| 			err = errNoSuchNotifications
 | |
| 		}
 | |
| 
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	config, err := event.ParseConfig(bytes.NewReader(configData), globalServerConfig.GetRegion(), globalNotificationSys.targetList)
 | |
| 	logger.LogIf(ctx, err)
 | |
| 	return config, err
 | |
| }
 | |
| 
 | |
| func saveNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucketName string, config *event.Config) error {
 | |
| 	data, err := xml.Marshal(config)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	configFile := path.Join(bucketConfigPrefix, bucketName, bucketNotificationConfig)
 | |
| 	return saveConfig(ctx, objAPI, configFile, data)
 | |
| }
 | |
| 
 | |
| // SaveListener - saves HTTP client currently listening for events to listener.json.
 | |
| func SaveListener(objAPI ObjectLayer, bucketName string, eventNames []event.Name, pattern string, targetID event.TargetID, addr xnet.Host) error {
 | |
| 	// listener.json is available/applicable only in DistXL mode.
 | |
| 	if !globalIsDistXL {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName})
 | |
| 
 | |
| 	// Construct path to listener.json for the given bucket.
 | |
| 	configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig)
 | |
| 	transactionConfigFile := configFile + ".transaction"
 | |
| 
 | |
| 	// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
 | |
| 	// and configFile, take a transaction lock to avoid data race between readConfig()
 | |
| 	// and saveConfig().
 | |
| 	objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
 | |
| 	if err := objLock.GetLock(globalOperationTimeout); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer objLock.Unlock()
 | |
| 
 | |
| 	configData, err := readConfig(ctx, objAPI, configFile)
 | |
| 	if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	listenerList := []ListenBucketNotificationArgs{}
 | |
| 	if configData != nil {
 | |
| 		if err = json.Unmarshal(configData, &listenerList); err != nil {
 | |
| 			logger.LogIf(ctx, err)
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	listenerList = append(listenerList, ListenBucketNotificationArgs{
 | |
| 		EventNames: eventNames,
 | |
| 		Pattern:    pattern,
 | |
| 		TargetID:   targetID,
 | |
| 		Addr:       addr,
 | |
| 	})
 | |
| 
 | |
| 	data, err := json.Marshal(listenerList)
 | |
| 	if err != nil {
 | |
| 		logger.LogIf(ctx, err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return saveConfig(ctx, objAPI, configFile, data)
 | |
| }
 | |
| 
 | |
| // RemoveListener - removes HTTP client currently listening for events from listener.json.
 | |
| func RemoveListener(objAPI ObjectLayer, bucketName string, targetID event.TargetID, addr xnet.Host) error {
 | |
| 	// listener.json is available/applicable only in DistXL mode.
 | |
| 	if !globalIsDistXL {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName})
 | |
| 
 | |
| 	// Construct path to listener.json for the given bucket.
 | |
| 	configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig)
 | |
| 	transactionConfigFile := configFile + ".transaction"
 | |
| 
 | |
| 	// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
 | |
| 	// and configFile, take a transaction lock to avoid data race between readConfig()
 | |
| 	// and saveConfig().
 | |
| 	objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
 | |
| 	if err := objLock.GetLock(globalOperationTimeout); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer objLock.Unlock()
 | |
| 
 | |
| 	configData, err := readConfig(ctx, objAPI, configFile)
 | |
| 	if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	listenerList := []ListenBucketNotificationArgs{}
 | |
| 	if configData != nil {
 | |
| 		if err = json.Unmarshal(configData, &listenerList); err != nil {
 | |
| 			logger.LogIf(ctx, err)
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if len(listenerList) == 0 {
 | |
| 		// Nothing to remove.
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	activeListenerList := []ListenBucketNotificationArgs{}
 | |
| 	for _, args := range listenerList {
 | |
| 		if args.TargetID == targetID && args.Addr.Equal(addr) {
 | |
| 			// Skip if matches
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		activeListenerList = append(activeListenerList, args)
 | |
| 	}
 | |
| 
 | |
| 	data, err := json.Marshal(activeListenerList)
 | |
| 	if err != nil {
 | |
| 		logger.LogIf(ctx, err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return saveConfig(ctx, objAPI, configFile, data)
 | |
| }
 |