optimize Listen bucket notification implementation (#9444)

this commit avoids lots of tiny allocations, repeated
channel creates which are performed when filtering
the incoming events, unescaping a key just for matching.

also remove deprecated code which is not needed
anymore, avoids unexpected data structure transformations
from the map to slice.
This commit is contained in:
Harshavardhana
2020-04-27 06:25:05 -07:00
committed by GitHub
parent f216670814
commit f14bf25cb9
17 changed files with 103 additions and 480 deletions

View File

@@ -22,7 +22,6 @@ import (
"errors"
"io"
"net/http"
"net/url"
"path"
"reflect"
"time"
@@ -296,11 +295,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
if ev.S3.Bucket.Name != values.Get(peerRESTListenBucket) {
return false
}
objectName, uerr := url.QueryUnescape(ev.S3.Object.Key)
if uerr != nil {
objectName = ev.S3.Object.Key
}
return len(rulesMap.Match(ev.EventName, objectName).ToSlice()) != 0
return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key)
})
for _, peer := range peers {

View File

@@ -131,6 +131,14 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
globalRootCAs, err = config.GetRootCAs(globalCertsCADir.Get())
logger.FatalIf(err, "Failed to read root CAs (%v)", err)
globalMinioEndpoint = func() string {
host := globalMinioHost
if host == "" {
host = sortIPs(localIP4.ToSlice())[0]
}
return fmt.Sprintf("%s://%s", getURLScheme(globalIsSSL), net.JoinHostPort(host, globalMinioPort))
}()
// Handle gateway specific env
gatewayHandleEnvVars()

View File

@@ -138,6 +138,8 @@ var (
globalMinioPort = globalMinioDefaultPort
// Holds the host that was passed using --address
globalMinioHost = ""
// Holds the possible host endpoint.
globalMinioEndpoint = ""
// globalConfigSys server config system.
globalConfigSys *ConfigSys

View File

@@ -22,7 +22,6 @@ import (
"encoding/xml"
"fmt"
"io"
"net"
"net/url"
"path"
"sort"
@@ -50,6 +49,7 @@ import (
type NotificationSys struct {
sync.RWMutex
targetList *event.TargetList
targetResCh chan event.TargetIDResult
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
peerClients []*peerRESTClient
@@ -662,19 +662,6 @@ func (sys *NotificationSys) AddRemoteTarget(bucketName string, target event.Targ
return nil
}
// RemoteTargetExist - checks whether given target ID is a HTTP/PeerRPC client target or not.
func (sys *NotificationSys) RemoteTargetExist(bucketName string, targetID event.TargetID) bool {
sys.Lock()
defer sys.Unlock()
targetMap, ok := sys.bucketRemoteTargetRulesMap[bucketName]
if ok {
_, ok = targetMap[targetID]
}
return ok
}
// Loads notification policies for all buckets into NotificationSys.
func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
for _, bucket := range buckets {
@@ -713,6 +700,17 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error
}
}
go func() {
for res := range sys.targetResCh {
if res.Err != nil {
reqInfo := &logger.ReqInfo{}
reqInfo.AppendTags("targetID", res.ID.Name)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogOnceIf(ctx, res.Err, res.ID)
}
}
}()
return sys.load(buckets, objAPI)
}
@@ -759,7 +757,9 @@ func (sys *NotificationSys) ConfiguredTargetIDs() []event.TargetID {
for _, rmap := range sys.bucketRulesMap {
for _, rules := range rmap {
for _, targetSet := range rules {
targetIDs = append(targetIDs, targetSet.ToSlice()...)
for id := range targetSet {
targetIDs = append(targetIDs, id)
}
}
}
}
@@ -780,69 +780,41 @@ func (sys *NotificationSys) RemoveNotification(bucketName string) {
delete(sys.bucketRulesMap, bucketName)
targetIDSet := event.NewTargetIDSet()
for targetID := range sys.bucketRemoteTargetRulesMap[bucketName] {
sys.targetList.Remove(targetID)
targetIDSet[targetID] = struct{}{}
delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID)
}
sys.targetList.Remove(targetIDSet)
delete(sys.bucketRemoteTargetRulesMap, bucketName)
}
// RemoveAllRemoteTargets - closes and removes all HTTP/PeerRPC client targets.
// RemoveAllRemoteTargets - closes and removes all notification targets.
func (sys *NotificationSys) RemoveAllRemoteTargets() {
sys.Lock()
defer sys.Unlock()
for _, targetMap := range sys.bucketRemoteTargetRulesMap {
for targetID := range targetMap {
sys.targetList.Remove(targetID)
targetIDSet := event.NewTargetIDSet()
for k := range targetMap {
targetIDSet[k] = struct{}{}
}
sys.targetList.Remove(targetIDSet)
}
}
// RemoveRemoteTarget - closes and removes target by target ID.
func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event.TargetID) {
for terr := range sys.targetList.Remove(targetID) {
reqInfo := (&logger.ReqInfo{}).AppendTags("targetID", terr.ID.Name)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, terr.Err)
}
sys.Lock()
defer sys.Unlock()
if _, ok := sys.bucketRemoteTargetRulesMap[bucketName]; ok {
delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID)
if len(sys.bucketRemoteTargetRulesMap[bucketName]) == 0 {
delete(sys.bucketRemoteTargetRulesMap, bucketName)
}
}
}
func (sys *NotificationSys) send(bucketName string, eventData event.Event, targetIDs ...event.TargetID) (errs []event.TargetIDErr) {
errCh := sys.targetList.Send(eventData, targetIDs...)
for terr := range errCh {
errs = append(errs, terr)
if sys.RemoteTargetExist(bucketName, terr.ID) {
sys.RemoveRemoteTarget(bucketName, terr.ID)
}
}
return errs
}
// Send - sends event data to all matching targets.
func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
func (sys *NotificationSys) Send(args eventArgs) {
sys.RLock()
targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
sys.RUnlock()
if len(targetIDSet) == 0 {
return nil
return
}
targetIDs := targetIDSet.ToSlice()
return sys.send(args.BucketName, args.ToEvent(), targetIDs...)
sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)
}
// PutBucketObjectLockConfig - put bucket object lock configuration to all peers.
@@ -1204,6 +1176,7 @@ func NewNotificationSys(endpoints EndpointZones) *NotificationSys {
// bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init()
return &NotificationSys{
targetList: event.NewTargetList(),
targetResCh: make(chan event.TargetIDResult),
bucketRulesMap: make(map[string]event.RulesMap),
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
peerClients: newPeerRestClients(endpoints),
@@ -1221,23 +1194,13 @@ type eventArgs struct {
}
// ToEvent - converts to notification event.
func (args eventArgs) ToEvent() event.Event {
getOriginEndpoint := func() string {
host := globalMinioHost
if host == "" {
// FIXME: Send FQDN or hostname of this machine than sending IP address.
host = sortIPs(localIP4.ToSlice())[0]
}
return fmt.Sprintf("%s://%s", getURLScheme(globalIsSSL), net.JoinHostPort(host, globalMinioPort))
}
func (args eventArgs) ToEvent(escape bool) event.Event {
eventTime := UTCNow()
uniqueID := fmt.Sprintf("%X", eventTime.UnixNano())
respElements := map[string]string{
"x-amz-request-id": args.RespElements["requestId"],
"x-minio-origin-endpoint": getOriginEndpoint(), // MinIO specific custom elements.
"x-minio-origin-endpoint": globalMinioEndpoint, // MinIO specific custom elements.
}
// Add deployment as part of
if globalDeploymentID != "" {
@@ -1246,6 +1209,10 @@ func (args eventArgs) ToEvent() event.Event {
if args.RespElements["content-length"] != "" {
respElements["content-length"] = args.RespElements["content-length"]
}
keyName := args.Object.Name
if escape {
keyName = url.QueryEscape(args.Object.Name)
}
newEvent := event.Event{
EventVersion: "2.0",
EventSource: "minio:s3",
@@ -1264,7 +1231,7 @@ func (args eventArgs) ToEvent() event.Event {
ARN: policy.ResourceARNPrefix + args.BucketName,
},
Object: event.Object{
Key: url.QueryEscape(args.Object.Name),
Key: keyName,
VersionID: "1",
Sequencer: uniqueID,
},
@@ -1308,19 +1275,10 @@ func sendEvent(args eventArgs) {
}
if globalHTTPListen.HasSubscribers() {
globalHTTPListen.Publish(args.ToEvent())
globalHTTPListen.Publish(args.ToEvent(false))
}
notifyCh := globalNotificationSys.Send(args)
go func() {
for _, err := range notifyCh {
reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name}
reqInfo.AppendTags("EventName", args.EventName.String())
reqInfo.AppendTags("targetID", err.ID.Name)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogOnceIf(ctx, err.Err, err.ID)
}
}()
globalNotificationSys.Send(args)
}
func readNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucketName string) (*event.Config, error) {

View File

@@ -1,72 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2018, 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "github.com/minio/minio/pkg/event"
// PeerRESTClientTarget - RPCClient is an event.Target which sends event to target of remote peer.
type PeerRESTClientTarget struct {
id event.TargetID
remoteTargetID event.TargetID
restClient *peerRESTClient
bucketName string
}
// ID - returns target ID.
func (target *PeerRESTClientTarget) ID() event.TargetID {
return target.id
}
// IsActive - does nothing and available for interface compatibility.
func (target *PeerRESTClientTarget) IsActive() (bool, error) {
return true, nil
}
// HasQueueStore - No-Op. Added for interface compatibility
func (target PeerRESTClientTarget) HasQueueStore() bool {
return false
}
// Save - Sends event directly without persisting.
func (target *PeerRESTClientTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
// Send - interface compatible method does no-op.
func (target *PeerRESTClientTarget) Send(eventKey string) error {
return nil
}
// sends event to remote peer by making RPC call.
func (target *PeerRESTClientTarget) send(eventData event.Event) error {
return target.restClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData)
}
// Close - does nothing and available for interface compatibility.
func (target *PeerRESTClientTarget) Close() error {
return nil
}
// NewPeerRESTClientTarget - creates RPCClient target with given target ID available in remote peer.
func NewPeerRESTClientTarget(bucketName string, targetID event.TargetID, restClient *peerRESTClient) *PeerRESTClientTarget {
return &PeerRESTClientTarget{
id: event.TargetID{ID: targetID.ID, Name: targetID.Name + "+" + mustGetUUID()},
remoteTargetID: targetID,
bucketName: bucketName,
restClient: restClient,
}
}

View File

@@ -500,78 +500,6 @@ func (client *peerRESTClient) ReloadFormat(dryRun bool) error {
return nil
}
// SendEvent - calls send event RPC.
func (client *peerRESTClient) SendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
numTries := 10
for {
err := client.sendEvent(bucket, targetID, remoteTargetID, eventData)
if err == nil {
return nil
}
if numTries == 0 {
return err
}
numTries--
time.Sleep(5 * time.Second)
}
}
func (client *peerRESTClient) sendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
args := sendEventRequest{
TargetID: remoteTargetID,
Event: eventData,
}
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(args)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodSendEvent, values, &reader, -1)
if err != nil {
return err
}
var eventResp sendEventResp
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&eventResp)
if err != nil || !eventResp.Success {
reqInfo := &logger.ReqInfo{BucketName: bucket}
reqInfo.AppendTags("targetID", targetID.Name)
reqInfo.AppendTags("event", eventData.EventName.String())
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
globalNotificationSys.RemoveRemoteTarget(bucket, targetID)
}
return err
}
// RemoteTargetExist - calls remote target ID exist REST API.
func (client *peerRESTClient) RemoteTargetExist(bucket string, targetID event.TargetID) (bool, error) {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(targetID)
if err != nil {
return false, err
}
respBody, err := client.call(peerRESTMethodTargetExists, values, &reader, -1)
if err != nil {
return false, err
}
defer http.DrainBody(respBody)
var targetExists remoteTargetExistsResp
err = gob.NewDecoder(respBody).Decode(&targetExists)
return targetExists.Exists, err
}
// RemoveBucketPolicy - Remove bucket policy on the peer node.
func (client *peerRESTClient) RemoveBucketPolicy(bucket string) error {
values := make(url.Values)

View File

@@ -17,7 +17,7 @@
package cmd
const (
peerRESTVersion = "v7"
peerRESTVersion = "v8"
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@@ -52,17 +52,14 @@ const (
peerRESTMethodDownloadProfilingData = "/downloadprofilingdata"
peerRESTMethodBucketPolicySet = "/setbucketpolicy"
peerRESTMethodBucketNotificationPut = "/putbucketnotification"
peerRESTMethodBucketNotificationListen = "/listenbucketnotification"
peerRESTMethodReloadFormat = "/reloadformat"
peerRESTMethodTargetExists = "/targetexists"
peerRESTMethodSendEvent = "/sendevent"
peerRESTMethodTrace = "/trace"
peerRESTMethodListen = "/listen"
peerRESTMethodLog = "/log"
peerRESTMethodBucketLifecycleSet = "/setbucketlifecycle"
peerRESTMethodBucketLifecycleRemove = "/removebucketlifecycle"
peerRESTMethodBucketEncryptionSet = "/setbucketencryption"
peerRESTMethodBucketEncryptionRemove = "/removebucketencryption"
peerRESTMethodLog = "/log"
peerRESTMethodPutBucketObjectLockConfig = "/putbucketobjectlockconfig"
peerRESTMethodBucketObjectLockConfigRemove = "/removebucketobjectlockconfig"
)

View File

@@ -24,7 +24,6 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
@@ -774,97 +773,6 @@ func (s *peerRESTServer) SetBucketSSEConfigHandler(w http.ResponseWriter, r *htt
w.(http.Flusher).Flush()
}
type remoteTargetExistsResp struct {
Exists bool
}
// TargetExistsHandler - Check if Target exists.
func (s *peerRESTServer) TargetExistsHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "TargetExists")
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
var targetID event.TargetID
if r.ContentLength <= 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&targetID)
if err != nil {
s.writeErrorResponse(w, err)
return
}
var targetExists remoteTargetExistsResp
targetExists.Exists = globalNotificationSys.RemoteTargetExist(bucketName, targetID)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(&targetExists))
}
type sendEventRequest struct {
Event event.Event
TargetID event.TargetID
}
type sendEventResp struct {
Success bool
}
// SendEventHandler - Send Event.
func (s *peerRESTServer) SendEventHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "SendEvent")
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
var eventReq sendEventRequest
if r.ContentLength <= 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&eventReq)
if err != nil {
s.writeErrorResponse(w, err)
return
}
var eventResp sendEventResp
eventResp.Success = true
errs := globalNotificationSys.send(bucketName, eventReq.Event, eventReq.TargetID)
for i := range errs {
reqInfo := (&logger.ReqInfo{}).AppendTags("Event", eventReq.Event.EventName.String())
reqInfo.AppendTags("targetName", eventReq.TargetID.Name)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, errs[i].Err)
eventResp.Success = false
s.writeErrorResponse(w, errs[i].Err)
return
}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(&eventResp))
w.(http.Flusher).Flush()
}
// PutBucketNotificationHandler - Set bucket policy.
func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@@ -1077,11 +985,7 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
if ev.S3.Bucket.Name != values.Get(peerRESTListenBucket) {
return false
}
objectName, uerr := url.QueryUnescape(ev.S3.Object.Key)
if uerr != nil {
objectName = ev.S3.Object.Key
}
return len(rulesMap.Match(ev.EventName, objectName).ToSlice()) != 0
return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key)
})
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
@@ -1239,8 +1143,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(httpTraceHdrs(server.DownloadProfilingDataHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketNotificationPut).HandlerFunc(httpTraceHdrs(server.PutBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadFormat).HandlerFunc(httpTraceHdrs(server.ReloadFormatHandler)).Queries(restQueries(peerRESTDryRun)...)

View File

@@ -111,7 +111,6 @@ func serverHandleCmdArgs(ctx *cli.Context) {
globalMinioAddr = globalCLIContext.Addr
globalMinioHost, globalMinioPort = mustSplitHostPort(globalMinioAddr)
endpoints := strings.Fields(env.Get(config.EnvEndpoints, ""))
if len(endpoints) > 0 {
globalEndpoints, globalXLSetDriveCount, setupType, err = createServerEndpoints(globalCLIContext.Addr, endpoints...)
@@ -344,6 +343,14 @@ func serverMain(ctx *cli.Context) {
globalRootCAs, err = config.GetRootCAs(globalCertsCADir.Get())
logger.FatalIf(err, "Failed to read root CAs (%v)", err)
globalMinioEndpoint = func() string {
host := globalMinioHost
if host == "" {
host = sortIPs(localIP4.ToSlice())[0]
}
return fmt.Sprintf("%s://%s", getURLScheme(globalIsSSL), net.JoinHostPort(host, globalMinioPort))
}()
// Is distributed setup, error out if no certificates are found for HTTPS endpoints.
if globalIsDistXL {
if globalEndpoints.HTTPS() && !globalIsSSL {