diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index d797ded9a..c8503a608 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -655,7 +655,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. globalNotificationSys.RemoveNotification(bucket) globalPolicySys.Remove(bucket) - for _, nerr := range globalNotificationSys.DeleteBucket(bucket) { + for nerr := range globalNotificationSys.DeleteBucket(bucket) { logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) logger.LogIf(ctx, nerr.Err) } diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index c897fedbf..3a78a145f 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -146,7 +146,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, rulesMap := config.ToRulesMap() globalNotificationSys.AddRulesMap(bucketName, rulesMap) - for _, nerr := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) { + for nerr := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) { logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) logger.LogIf(ctx, nerr.Err) } @@ -251,8 +251,8 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit return } - errs := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr) - for _, nerr := range errs { + errCh := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr) + for nerr := range errCh { logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) logger.LogIf(ctx, nerr.Err) } diff --git a/cmd/bucket-policy-handlers.go b/cmd/bucket-policy-handlers.go index ce4566bdd..23dfc6162 100644 --- a/cmd/bucket-policy-handlers.go +++ b/cmd/bucket-policy-handlers.go @@ -91,7 +91,7 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht } globalPolicySys.Set(bucket, *bucketPolicy) - for _, nerr := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) { + for nerr := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) { logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) logger.LogIf(ctx, nerr.Err) } @@ -130,7 +130,7 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r } globalPolicySys.Remove(bucket) - for _, nerr := range globalNotificationSys.RemoveBucketPolicy(bucket) { + for nerr := range globalNotificationSys.RemoveBucketPolicy(bucket) { logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) logger.LogIf(ctx, nerr.Err) } diff --git a/cmd/notification.go b/cmd/notification.go index 9d1662608..71d60e64c 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -66,119 +66,129 @@ type NotificationPeerErr struct { } // DeleteBucket - calls DeleteBucket RPC call on all peers. -func (sys *NotificationSys) DeleteBucket(bucketName string) []NotificationPeerErr { - errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) - var wg sync.WaitGroup - idx := 0 - for addr, client := range sys.peerRPCClientMap { - wg.Add(1) - go func(idx int, addr xnet.Host, client *PeerRPCClient) { - defer wg.Done() - if err := client.DeleteBucket(bucketName); err != nil { - errs[idx] = NotificationPeerErr{ - Host: addr, - Err: err, - } - } - }(idx, addr, client) - idx++ - } - wg.Wait() +func (sys *NotificationSys) DeleteBucket(bucketName string) <-chan NotificationPeerErr { + errCh := make(chan NotificationPeerErr) + go func() { + defer close(errCh) - return errs + var wg sync.WaitGroup + for addr, client := range sys.peerRPCClientMap { + wg.Add(1) + go func(addr xnet.Host, client *PeerRPCClient) { + defer wg.Done() + if err := client.DeleteBucket(bucketName); err != nil { + errCh <- NotificationPeerErr{ + Host: addr, + Err: err, + } + } + }(addr, client) + } + wg.Wait() + }() + + return errCh } // SetBucketPolicy - calls SetBucketPolicy RPC call on all peers. -func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) []NotificationPeerErr { - errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) - var wg sync.WaitGroup - idx := 0 - for addr, client := range sys.peerRPCClientMap { - wg.Add(1) - go func(idx int, addr xnet.Host, client *PeerRPCClient) { - defer wg.Done() - if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil { - errs[idx] = NotificationPeerErr{ - Host: addr, - Err: err, - } - } - }(idx, addr, client) - idx++ - } - wg.Wait() +func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) <-chan NotificationPeerErr { + errCh := make(chan NotificationPeerErr) + go func() { + defer close(errCh) - return errs + var wg sync.WaitGroup + for addr, client := range sys.peerRPCClientMap { + wg.Add(1) + go func(addr xnet.Host, client *PeerRPCClient) { + defer wg.Done() + if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil { + errCh <- NotificationPeerErr{ + Host: addr, + Err: err, + } + } + }(addr, client) + } + wg.Wait() + }() + + return errCh } // RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers. -func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) []NotificationPeerErr { - errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) - var wg sync.WaitGroup - idx := 0 - for addr, client := range sys.peerRPCClientMap { - wg.Add(1) - go func(idx int, addr xnet.Host, client *PeerRPCClient) { - defer wg.Done() - if err := client.RemoveBucketPolicy(bucketName); err != nil { - errs[idx] = NotificationPeerErr{ - Host: addr, - Err: err, - } - } - }(idx, addr, client) - idx++ - } - wg.Wait() +func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) <-chan NotificationPeerErr { + errCh := make(chan NotificationPeerErr) + go func() { + defer close(errCh) - return errs + var wg sync.WaitGroup + for addr, client := range sys.peerRPCClientMap { + wg.Add(1) + go func(addr xnet.Host, client *PeerRPCClient) { + defer wg.Done() + if err := client.RemoveBucketPolicy(bucketName); err != nil { + errCh <- NotificationPeerErr{ + Host: addr, + Err: err, + } + } + }(addr, client) + } + wg.Wait() + }() + + return errCh } // PutBucketNotification - calls PutBucketNotification RPC call on all peers. -func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) []NotificationPeerErr { - errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) - var wg sync.WaitGroup - idx := 0 - for addr, client := range sys.peerRPCClientMap { - wg.Add(1) - go func(idx int, addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) { - defer wg.Done() - if err := client.PutBucketNotification(bucketName, rulesMap); err != nil { - errs[idx] = NotificationPeerErr{ - Host: addr, - Err: err, - } - } - }(idx, addr, client, rulesMap.Clone()) - idx++ - } - wg.Wait() +func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) <-chan NotificationPeerErr { + errCh := make(chan NotificationPeerErr) + go func() { + defer close(errCh) - return errs + var wg sync.WaitGroup + for addr, client := range sys.peerRPCClientMap { + wg.Add(1) + go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) { + defer wg.Done() + if err := client.PutBucketNotification(bucketName, rulesMap); err != nil { + errCh <- NotificationPeerErr{ + Host: addr, + Err: err, + } + } + }(addr, client, rulesMap.Clone()) + } + wg.Wait() + }() + + return errCh } // ListenBucketNotification - calls ListenBucketNotification RPC call on all peers. func (sys *NotificationSys) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string, - targetID event.TargetID, localPeer xnet.Host) []NotificationPeerErr { - errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) - var wg sync.WaitGroup - idx := 0 - for addr, client := range sys.peerRPCClientMap { - wg.Add(1) - go func(idx int, addr xnet.Host, client *PeerRPCClient) { - defer wg.Done() - if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil { - errs[idx] = NotificationPeerErr{ - Host: addr, - Err: err, - } - } - }(idx, addr, client) - idx++ - } - wg.Wait() + targetID event.TargetID, localPeer xnet.Host) <-chan NotificationPeerErr { + errCh := make(chan NotificationPeerErr) + go func() { + defer close(errCh) - return errs + var wg sync.WaitGroup + for addr, client := range sys.peerRPCClientMap { + wg.Add(1) + go func(addr xnet.Host, client *PeerRPCClient) { + defer wg.Done() + if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil { + errCh <- NotificationPeerErr{ + Host: addr, + Err: err, + } + } + }(addr, client) + } + wg.Wait() + }() + + return errCh } // AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name. @@ -382,10 +392,10 @@ func (sys *NotificationSys) RemoveAllRemoteTargets() { // RemoveRemoteTarget - closes and removes target by target ID. func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event.TargetID) { - for id, err := range sys.targetList.Remove(targetID) { - reqInfo := (&logger.ReqInfo{}).AppendTags("targetID", id.Name) + for terr := range sys.targetList.Remove(targetID) { + reqInfo := (&logger.ReqInfo{}).AppendTags("targetID", terr.ID.Name) ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) + logger.LogIf(ctx, terr.Err) } sys.Lock() @@ -399,19 +409,20 @@ func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event } } -func (sys *NotificationSys) send(bucketName string, eventData event.Event, targetIDs ...event.TargetID) map[event.TargetID]error { - errMap := sys.targetList.Send(eventData, targetIDs...) - for targetID := range errMap { - if sys.RemoteTargetExist(bucketName, targetID) { - sys.RemoveRemoteTarget(bucketName, targetID) +func (sys *NotificationSys) send(bucketName string, eventData event.Event, targetIDs ...event.TargetID) (errs []event.TargetIDErr) { + errCh := sys.targetList.Send(eventData, targetIDs...) + for terr := range errCh { + errs = append(errs, terr) + if sys.RemoteTargetExist(bucketName, terr.ID) { + sys.RemoveRemoteTarget(bucketName, terr.ID) } } - return errMap + return errs } // Send - sends event data to all matching targets. -func (sys *NotificationSys) Send(args eventArgs) map[event.TargetID]error { +func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr { sys.RLock() targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name) sys.RUnlock() @@ -516,12 +527,12 @@ func sendEvent(args eventArgs) { return } - for targetID, err := range globalNotificationSys.Send(args) { + for _, err := range globalNotificationSys.Send(args) { reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name} reqInfo.AppendTags("EventName", args.EventName.String()) - reqInfo.AppendTags("targetID", targetID.Name) + reqInfo.AppendTags("targetID", err.ID.Name) ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) + logger.LogIf(ctx, err.Err) } } diff --git a/cmd/peer-rpc.go b/cmd/peer-rpc.go index c7f1ebed9..0c7f59ebf 100644 --- a/cmd/peer-rpc.go +++ b/cmd/peer-rpc.go @@ -163,18 +163,12 @@ func (receiver *PeerRPCReceiver) SendEvent(args *SendEventArgs, reply *SendEvent } var err error - if errMap := globalNotificationSys.send(args.BucketName, args.Event, args.TargetID); len(errMap) != 0 { - var found bool - if err, found = errMap[args.TargetID]; !found { - return fmt.Errorf("error for target %v not found in error map %+v", args.TargetID, errMap) - } - } - - if err != nil { + for _, terr := range globalNotificationSys.send(args.BucketName, args.Event, args.TargetID) { reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String()) reqInfo.AppendTags("targetName", args.TargetID.Name) ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) + logger.LogIf(ctx, terr.Err) + err = terr.Err } reply.Error = err diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 622bafb02..1d5e41ab9 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -169,7 +169,7 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs, globalNotificationSys.RemoveNotification(args.BucketName) globalPolicySys.Remove(args.BucketName) - for _, nerr := range globalNotificationSys.DeleteBucket(args.BucketName) { + for nerr := range globalNotificationSys.DeleteBucket(args.BucketName) { logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) logger.LogIf(ctx, nerr.Err) } @@ -943,7 +943,7 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic } globalPolicySys.Set(args.BucketName, *bucketPolicy) - for _, nerr := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) { + for nerr := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) { logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) logger.LogIf(ctx, nerr.Err) } diff --git a/pkg/event/targetlist.go b/pkg/event/targetlist.go index 35f5a4cb2..96a29b828 100644 --- a/pkg/event/targetlist.go +++ b/pkg/event/targetlist.go @@ -56,32 +56,47 @@ func (list *TargetList) Exists(id TargetID) bool { return found } +// TargetIDErr returns error associated for a targetID +type TargetIDErr struct { + // ID where the remove or send were initiated. + ID TargetID + // Stores any error while removing a target or while sending an event. + Err error +} + // Remove - closes and removes targets by given target IDs. -func (list *TargetList) Remove(ids ...TargetID) map[TargetID]error { +func (list *TargetList) Remove(targetids ...TargetID) <-chan TargetIDErr { list.Lock() defer list.Unlock() - errors := make(map[TargetID]error) + errCh := make(chan TargetIDErr) - var wg sync.WaitGroup - for _, id := range ids { - if target, ok := list.targets[id]; ok { - wg.Add(1) - go func(id TargetID, target Target) { - defer wg.Done() - if err := target.Close(); err != nil { - errors[id] = err - } - }(id, target) + go func() { + defer close(errCh) + + var wg sync.WaitGroup + for _, id := range targetids { + if target, ok := list.targets[id]; ok { + wg.Add(1) + go func(id TargetID, target Target) { + defer wg.Done() + if err := target.Close(); err != nil { + errCh <- TargetIDErr{ + ID: id, + Err: err, + } + } + }(id, target) + } } - } - wg.Wait() + wg.Wait() - for _, id := range ids { - delete(list.targets, id) - } + for _, id := range targetids { + delete(list.targets, id) + } + }() - return errors + return errCh } // List - returns available target IDs. @@ -98,27 +113,34 @@ func (list *TargetList) List() []TargetID { } // Send - sends events to targets identified by target IDs. -func (list *TargetList) Send(event Event, targetIDs ...TargetID) map[TargetID]error { +func (list *TargetList) Send(event Event, targetIDs ...TargetID) <-chan TargetIDErr { list.Lock() defer list.Unlock() - errors := make(map[TargetID]error) + errCh := make(chan TargetIDErr) - var wg sync.WaitGroup - for _, id := range targetIDs { - if target, ok := list.targets[id]; ok { - wg.Add(1) - go func(id TargetID, target Target) { - defer wg.Done() - if err := target.Send(event); err != nil { - errors[id] = err - } - }(id, target) + go func() { + defer close(errCh) + + var wg sync.WaitGroup + for _, id := range targetIDs { + if target, ok := list.targets[id]; ok { + wg.Add(1) + go func(id TargetID, target Target) { + defer wg.Done() + if err := target.Send(event); err != nil { + errCh <- TargetIDErr{ + ID: id, + Err: err, + } + } + }(id, target) + } } - } - wg.Wait() + wg.Wait() + }() - return errors + return errCh } // NewTargetList - creates TargetList. diff --git a/pkg/event/targetlist_test.go b/pkg/event/targetlist_test.go index 56dc0d6dd..e3bfd3051 100644 --- a/pkg/event/targetlist_test.go +++ b/pkg/event/targetlist_test.go @@ -168,9 +168,9 @@ func TestTargetListRemove(t *testing.T) { } for i, testCase := range testCases { - errors := testCase.targetList.Remove(testCase.targetID) - err := errors[testCase.targetID] - expectErr := (err != nil) + errCh := testCase.targetList.Remove(testCase.targetID) + err := <-errCh + expectErr := (err.Err != nil) if expectErr != testCase.expectErr { t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr) @@ -255,9 +255,9 @@ func TestTargetListSend(t *testing.T) { } for i, testCase := range testCases { - errors := testCase.targetList.Send(Event{}, testCase.targetID) - err := errors[testCase.targetID] - expectErr := (err != nil) + errCh := testCase.targetList.Send(Event{}, testCase.targetID) + err := <-errCh + expectErr := (err.Err != nil) if expectErr != testCase.expectErr { t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)