mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: Avoid more crashes due to concurrent map usage (#5912)
This PR fixes another situation where a crash occurs thanks to @krishnasrinivas for reproducing this Fixes #5897
This commit is contained in:
parent
c525424179
commit
4886bfbc72
@ -655,7 +655,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
|
|||||||
|
|
||||||
globalNotificationSys.RemoveNotification(bucket)
|
globalNotificationSys.RemoveNotification(bucket)
|
||||||
globalPolicySys.Remove(bucket)
|
globalPolicySys.Remove(bucket)
|
||||||
for _, nerr := range globalNotificationSys.DeleteBucket(bucket) {
|
for nerr := range globalNotificationSys.DeleteBucket(bucket) {
|
||||||
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
}
|
}
|
||||||
|
@ -146,7 +146,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
|
|||||||
|
|
||||||
rulesMap := config.ToRulesMap()
|
rulesMap := config.ToRulesMap()
|
||||||
globalNotificationSys.AddRulesMap(bucketName, rulesMap)
|
globalNotificationSys.AddRulesMap(bucketName, rulesMap)
|
||||||
for _, nerr := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) {
|
for nerr := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) {
|
||||||
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
}
|
}
|
||||||
@ -251,8 +251,8 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
errs := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr)
|
errCh := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr)
|
||||||
for _, nerr := range errs {
|
for nerr := range errCh {
|
||||||
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
}
|
}
|
||||||
|
@ -91,7 +91,7 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
|
|||||||
}
|
}
|
||||||
|
|
||||||
globalPolicySys.Set(bucket, *bucketPolicy)
|
globalPolicySys.Set(bucket, *bucketPolicy)
|
||||||
for _, nerr := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) {
|
for nerr := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) {
|
||||||
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
}
|
}
|
||||||
@ -130,7 +130,7 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
|
|
||||||
globalPolicySys.Remove(bucket)
|
globalPolicySys.Remove(bucket)
|
||||||
for _, nerr := range globalNotificationSys.RemoveBucketPolicy(bucket) {
|
for nerr := range globalNotificationSys.RemoveBucketPolicy(bucket) {
|
||||||
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
}
|
}
|
||||||
|
@ -66,119 +66,129 @@ type NotificationPeerErr struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBucket - calls DeleteBucket RPC call on all peers.
|
// DeleteBucket - calls DeleteBucket RPC call on all peers.
|
||||||
func (sys *NotificationSys) DeleteBucket(bucketName string) []NotificationPeerErr {
|
func (sys *NotificationSys) DeleteBucket(bucketName string) <-chan NotificationPeerErr {
|
||||||
errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
|
errCh := make(chan NotificationPeerErr)
|
||||||
var wg sync.WaitGroup
|
go func() {
|
||||||
idx := 0
|
defer close(errCh)
|
||||||
for addr, client := range sys.peerRPCClientMap {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(idx int, addr xnet.Host, client *PeerRPCClient) {
|
|
||||||
defer wg.Done()
|
|
||||||
if err := client.DeleteBucket(bucketName); err != nil {
|
|
||||||
errs[idx] = NotificationPeerErr{
|
|
||||||
Host: addr,
|
|
||||||
Err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(idx, addr, client)
|
|
||||||
idx++
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return errs
|
var wg sync.WaitGroup
|
||||||
|
for addr, client := range sys.peerRPCClientMap {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(addr xnet.Host, client *PeerRPCClient) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := client.DeleteBucket(bucketName); err != nil {
|
||||||
|
errCh <- NotificationPeerErr{
|
||||||
|
Host: addr,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(addr, client)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetBucketPolicy - calls SetBucketPolicy RPC call on all peers.
|
// SetBucketPolicy - calls SetBucketPolicy RPC call on all peers.
|
||||||
func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) []NotificationPeerErr {
|
func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) <-chan NotificationPeerErr {
|
||||||
errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
|
errCh := make(chan NotificationPeerErr)
|
||||||
var wg sync.WaitGroup
|
go func() {
|
||||||
idx := 0
|
defer close(errCh)
|
||||||
for addr, client := range sys.peerRPCClientMap {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(idx int, addr xnet.Host, client *PeerRPCClient) {
|
|
||||||
defer wg.Done()
|
|
||||||
if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil {
|
|
||||||
errs[idx] = NotificationPeerErr{
|
|
||||||
Host: addr,
|
|
||||||
Err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(idx, addr, client)
|
|
||||||
idx++
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return errs
|
var wg sync.WaitGroup
|
||||||
|
for addr, client := range sys.peerRPCClientMap {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(addr xnet.Host, client *PeerRPCClient) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil {
|
||||||
|
errCh <- NotificationPeerErr{
|
||||||
|
Host: addr,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(addr, client)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers.
|
// RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers.
|
||||||
func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) []NotificationPeerErr {
|
func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) <-chan NotificationPeerErr {
|
||||||
errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
|
errCh := make(chan NotificationPeerErr)
|
||||||
var wg sync.WaitGroup
|
go func() {
|
||||||
idx := 0
|
defer close(errCh)
|
||||||
for addr, client := range sys.peerRPCClientMap {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(idx int, addr xnet.Host, client *PeerRPCClient) {
|
|
||||||
defer wg.Done()
|
|
||||||
if err := client.RemoveBucketPolicy(bucketName); err != nil {
|
|
||||||
errs[idx] = NotificationPeerErr{
|
|
||||||
Host: addr,
|
|
||||||
Err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(idx, addr, client)
|
|
||||||
idx++
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return errs
|
var wg sync.WaitGroup
|
||||||
|
for addr, client := range sys.peerRPCClientMap {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(addr xnet.Host, client *PeerRPCClient) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := client.RemoveBucketPolicy(bucketName); err != nil {
|
||||||
|
errCh <- NotificationPeerErr{
|
||||||
|
Host: addr,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(addr, client)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutBucketNotification - calls PutBucketNotification RPC call on all peers.
|
// PutBucketNotification - calls PutBucketNotification RPC call on all peers.
|
||||||
func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) []NotificationPeerErr {
|
func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) <-chan NotificationPeerErr {
|
||||||
errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
|
errCh := make(chan NotificationPeerErr)
|
||||||
var wg sync.WaitGroup
|
go func() {
|
||||||
idx := 0
|
defer close(errCh)
|
||||||
for addr, client := range sys.peerRPCClientMap {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(idx int, addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) {
|
|
||||||
defer wg.Done()
|
|
||||||
if err := client.PutBucketNotification(bucketName, rulesMap); err != nil {
|
|
||||||
errs[idx] = NotificationPeerErr{
|
|
||||||
Host: addr,
|
|
||||||
Err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(idx, addr, client, rulesMap.Clone())
|
|
||||||
idx++
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return errs
|
var wg sync.WaitGroup
|
||||||
|
for addr, client := range sys.peerRPCClientMap {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := client.PutBucketNotification(bucketName, rulesMap); err != nil {
|
||||||
|
errCh <- NotificationPeerErr{
|
||||||
|
Host: addr,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(addr, client, rulesMap.Clone())
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenBucketNotification - calls ListenBucketNotification RPC call on all peers.
|
// ListenBucketNotification - calls ListenBucketNotification RPC call on all peers.
|
||||||
func (sys *NotificationSys) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string,
|
func (sys *NotificationSys) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string,
|
||||||
targetID event.TargetID, localPeer xnet.Host) []NotificationPeerErr {
|
targetID event.TargetID, localPeer xnet.Host) <-chan NotificationPeerErr {
|
||||||
errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
|
errCh := make(chan NotificationPeerErr)
|
||||||
var wg sync.WaitGroup
|
go func() {
|
||||||
idx := 0
|
defer close(errCh)
|
||||||
for addr, client := range sys.peerRPCClientMap {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(idx int, addr xnet.Host, client *PeerRPCClient) {
|
|
||||||
defer wg.Done()
|
|
||||||
if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil {
|
|
||||||
errs[idx] = NotificationPeerErr{
|
|
||||||
Host: addr,
|
|
||||||
Err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(idx, addr, client)
|
|
||||||
idx++
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return errs
|
var wg sync.WaitGroup
|
||||||
|
for addr, client := range sys.peerRPCClientMap {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(addr xnet.Host, client *PeerRPCClient) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil {
|
||||||
|
errCh <- NotificationPeerErr{
|
||||||
|
Host: addr,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(addr, client)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name.
|
// AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name.
|
||||||
@ -382,10 +392,10 @@ func (sys *NotificationSys) RemoveAllRemoteTargets() {
|
|||||||
|
|
||||||
// RemoveRemoteTarget - closes and removes target by target ID.
|
// RemoveRemoteTarget - closes and removes target by target ID.
|
||||||
func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event.TargetID) {
|
func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event.TargetID) {
|
||||||
for id, err := range sys.targetList.Remove(targetID) {
|
for terr := range sys.targetList.Remove(targetID) {
|
||||||
reqInfo := (&logger.ReqInfo{}).AppendTags("targetID", id.Name)
|
reqInfo := (&logger.ReqInfo{}).AppendTags("targetID", terr.ID.Name)
|
||||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, terr.Err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sys.Lock()
|
sys.Lock()
|
||||||
@ -399,19 +409,20 @@ func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sys *NotificationSys) send(bucketName string, eventData event.Event, targetIDs ...event.TargetID) map[event.TargetID]error {
|
func (sys *NotificationSys) send(bucketName string, eventData event.Event, targetIDs ...event.TargetID) (errs []event.TargetIDErr) {
|
||||||
errMap := sys.targetList.Send(eventData, targetIDs...)
|
errCh := sys.targetList.Send(eventData, targetIDs...)
|
||||||
for targetID := range errMap {
|
for terr := range errCh {
|
||||||
if sys.RemoteTargetExist(bucketName, targetID) {
|
errs = append(errs, terr)
|
||||||
sys.RemoveRemoteTarget(bucketName, targetID)
|
if sys.RemoteTargetExist(bucketName, terr.ID) {
|
||||||
|
sys.RemoveRemoteTarget(bucketName, terr.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return errMap
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send - sends event data to all matching targets.
|
// Send - sends event data to all matching targets.
|
||||||
func (sys *NotificationSys) Send(args eventArgs) map[event.TargetID]error {
|
func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
|
||||||
sys.RLock()
|
sys.RLock()
|
||||||
targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
|
targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
|
||||||
sys.RUnlock()
|
sys.RUnlock()
|
||||||
@ -516,12 +527,12 @@ func sendEvent(args eventArgs) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for targetID, err := range globalNotificationSys.Send(args) {
|
for _, err := range globalNotificationSys.Send(args) {
|
||||||
reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name}
|
reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name}
|
||||||
reqInfo.AppendTags("EventName", args.EventName.String())
|
reqInfo.AppendTags("EventName", args.EventName.String())
|
||||||
reqInfo.AppendTags("targetID", targetID.Name)
|
reqInfo.AppendTags("targetID", err.ID.Name)
|
||||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err.Err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,18 +163,12 @@ func (receiver *PeerRPCReceiver) SendEvent(args *SendEventArgs, reply *SendEvent
|
|||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if errMap := globalNotificationSys.send(args.BucketName, args.Event, args.TargetID); len(errMap) != 0 {
|
for _, terr := range globalNotificationSys.send(args.BucketName, args.Event, args.TargetID) {
|
||||||
var found bool
|
|
||||||
if err, found = errMap[args.TargetID]; !found {
|
|
||||||
return fmt.Errorf("error for target %v not found in error map %+v", args.TargetID, errMap)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String())
|
reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String())
|
||||||
reqInfo.AppendTags("targetName", args.TargetID.Name)
|
reqInfo.AppendTags("targetName", args.TargetID.Name)
|
||||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, terr.Err)
|
||||||
|
err = terr.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.Error = err
|
reply.Error = err
|
||||||
|
@ -169,7 +169,7 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs,
|
|||||||
|
|
||||||
globalNotificationSys.RemoveNotification(args.BucketName)
|
globalNotificationSys.RemoveNotification(args.BucketName)
|
||||||
globalPolicySys.Remove(args.BucketName)
|
globalPolicySys.Remove(args.BucketName)
|
||||||
for _, nerr := range globalNotificationSys.DeleteBucket(args.BucketName) {
|
for nerr := range globalNotificationSys.DeleteBucket(args.BucketName) {
|
||||||
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
}
|
}
|
||||||
@ -943,7 +943,7 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
|
|||||||
}
|
}
|
||||||
|
|
||||||
globalPolicySys.Set(args.BucketName, *bucketPolicy)
|
globalPolicySys.Set(args.BucketName, *bucketPolicy)
|
||||||
for _, nerr := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) {
|
for nerr := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) {
|
||||||
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
}
|
}
|
||||||
|
@ -56,32 +56,47 @@ func (list *TargetList) Exists(id TargetID) bool {
|
|||||||
return found
|
return found
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TargetIDErr returns error associated for a targetID
|
||||||
|
type TargetIDErr struct {
|
||||||
|
// ID where the remove or send were initiated.
|
||||||
|
ID TargetID
|
||||||
|
// Stores any error while removing a target or while sending an event.
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
// Remove - closes and removes targets by given target IDs.
|
// Remove - closes and removes targets by given target IDs.
|
||||||
func (list *TargetList) Remove(ids ...TargetID) map[TargetID]error {
|
func (list *TargetList) Remove(targetids ...TargetID) <-chan TargetIDErr {
|
||||||
list.Lock()
|
list.Lock()
|
||||||
defer list.Unlock()
|
defer list.Unlock()
|
||||||
|
|
||||||
errors := make(map[TargetID]error)
|
errCh := make(chan TargetIDErr)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
go func() {
|
||||||
for _, id := range ids {
|
defer close(errCh)
|
||||||
if target, ok := list.targets[id]; ok {
|
|
||||||
wg.Add(1)
|
var wg sync.WaitGroup
|
||||||
go func(id TargetID, target Target) {
|
for _, id := range targetids {
|
||||||
defer wg.Done()
|
if target, ok := list.targets[id]; ok {
|
||||||
if err := target.Close(); err != nil {
|
wg.Add(1)
|
||||||
errors[id] = err
|
go func(id TargetID, target Target) {
|
||||||
}
|
defer wg.Done()
|
||||||
}(id, target)
|
if err := target.Close(); err != nil {
|
||||||
|
errCh <- TargetIDErr{
|
||||||
|
ID: id,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(id, target)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
wg.Wait()
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
for _, id := range ids {
|
for _, id := range targetids {
|
||||||
delete(list.targets, id)
|
delete(list.targets, id)
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return errors
|
return errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// List - returns available target IDs.
|
// List - returns available target IDs.
|
||||||
@ -98,27 +113,34 @@ func (list *TargetList) List() []TargetID {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send - sends events to targets identified by target IDs.
|
// Send - sends events to targets identified by target IDs.
|
||||||
func (list *TargetList) Send(event Event, targetIDs ...TargetID) map[TargetID]error {
|
func (list *TargetList) Send(event Event, targetIDs ...TargetID) <-chan TargetIDErr {
|
||||||
list.Lock()
|
list.Lock()
|
||||||
defer list.Unlock()
|
defer list.Unlock()
|
||||||
|
|
||||||
errors := make(map[TargetID]error)
|
errCh := make(chan TargetIDErr)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
go func() {
|
||||||
for _, id := range targetIDs {
|
defer close(errCh)
|
||||||
if target, ok := list.targets[id]; ok {
|
|
||||||
wg.Add(1)
|
var wg sync.WaitGroup
|
||||||
go func(id TargetID, target Target) {
|
for _, id := range targetIDs {
|
||||||
defer wg.Done()
|
if target, ok := list.targets[id]; ok {
|
||||||
if err := target.Send(event); err != nil {
|
wg.Add(1)
|
||||||
errors[id] = err
|
go func(id TargetID, target Target) {
|
||||||
}
|
defer wg.Done()
|
||||||
}(id, target)
|
if err := target.Send(event); err != nil {
|
||||||
|
errCh <- TargetIDErr{
|
||||||
|
ID: id,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(id, target)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
wg.Wait()
|
||||||
wg.Wait()
|
}()
|
||||||
|
|
||||||
return errors
|
return errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTargetList - creates TargetList.
|
// NewTargetList - creates TargetList.
|
||||||
|
@ -168,9 +168,9 @@ func TestTargetListRemove(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
errors := testCase.targetList.Remove(testCase.targetID)
|
errCh := testCase.targetList.Remove(testCase.targetID)
|
||||||
err := errors[testCase.targetID]
|
err := <-errCh
|
||||||
expectErr := (err != nil)
|
expectErr := (err.Err != nil)
|
||||||
|
|
||||||
if expectErr != testCase.expectErr {
|
if expectErr != testCase.expectErr {
|
||||||
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
|
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
|
||||||
@ -255,9 +255,9 @@ func TestTargetListSend(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
errors := testCase.targetList.Send(Event{}, testCase.targetID)
|
errCh := testCase.targetList.Send(Event{}, testCase.targetID)
|
||||||
err := errors[testCase.targetID]
|
err := <-errCh
|
||||||
expectErr := (err != nil)
|
expectErr := (err.Err != nil)
|
||||||
|
|
||||||
if expectErr != testCase.expectErr {
|
if expectErr != testCase.expectErr {
|
||||||
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
|
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
|
||||||
|
Loading…
Reference in New Issue
Block a user