Fix tracing send on closed channel (#18982)

Depending on when the context cancelation is picked up the handler may return and close the channel before `SubscribeJSON` returns, causing:

```
Feb 05 17:12:00 s3-us-node11 minio[3973657]: panic: send on closed channel
Feb 05 17:12:00 s3-us-node11 minio[3973657]: goroutine 378007076 [running]:
Feb 05 17:12:00 s3-us-node11 minio[3973657]: github.com/minio/minio/internal/pubsub.(*PubSub[...]).SubscribeJSON.func1()
Feb 05 17:12:00 s3-us-node11 minio[3973657]:         github.com/minio/minio/internal/pubsub/pubsub.go:139 +0x12d
Feb 05 17:12:00 s3-us-node11 minio[3973657]: created by github.com/minio/minio/internal/pubsub.(*PubSub[...]).SubscribeJSON in goroutine 378010884
Feb 05 17:12:00 s3-us-node11 minio[3973657]:         github.com/minio/minio/internal/pubsub/pubsub.go:124 +0x352
```

Wait explicitly for the goroutine to exit.

Bonus: Listen for doneCh when sending to not risk getting blocked there is channel isn't being emptied.
This commit is contained in:
Klaus Post 2024-02-06 08:57:30 -08:00 committed by GitHub
parent 630963fa6b
commit ebc6c9b498
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 40 additions and 23 deletions

View File

@ -1935,7 +1935,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
peers, _ := newPeerRestClients(globalEndpoints)
err = globalTrace.SubscribeJSON(traceOpts.TraceTypes(), traceCh, ctx.Done(), func(entry madmin.TraceInfo) bool {
return shouldTrace(entry, traceOpts)
})
}, nil)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return

View File

@ -30,6 +30,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -999,12 +1000,13 @@ func (s *peerRESTServer) TraceHandler(ctx context.Context, payload []byte, _ <-c
if err != nil {
return grid.NewRemoteErr(err)
}
var wg sync.WaitGroup
// Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers.
// Use buffered channel to take care of burst sends or slow w.Write()
err = globalTrace.SubscribeJSON(traceOpts.TraceTypes(), out, ctx.Done(), func(entry madmin.TraceInfo) bool {
return shouldTrace(entry, traceOpts)
})
}, &wg)
if err != nil {
return grid.NewRemoteErr(err)
}
@ -1013,8 +1015,9 @@ func (s *peerRESTServer) TraceHandler(ctx context.Context, payload []byte, _ <-c
if traceOpts.TraceTypes().Contains(madmin.TraceBootstrap) {
go globalBootstrapTracer.Publish(ctx, globalTrace)
}
// Wait for remote to cancel.
<-ctx.Done()
// Wait for remote to cancel and SubscribeJSON to exit.
wg.Wait()
return nil
}

View File

@ -104,7 +104,7 @@ func (ps *PubSub[T, M]) Subscribe(mask M, subCh chan T, doneCh <-chan struct{},
}
// SubscribeJSON - Adds a subscriber to pubsub system and returns results with JSON encoding.
func (ps *PubSub[T, M]) SubscribeJSON(mask M, subCh chan<- []byte, doneCh <-chan struct{}, filter func(entry T) bool) error {
func (ps *PubSub[T, M]) SubscribeJSON(mask M, subCh chan<- []byte, doneCh <-chan struct{}, filter func(entry T) bool, wg *sync.WaitGroup) error {
totalSubs := atomic.AddInt32(&ps.numSubscribers, 1)
if ps.maxSubscribers > 0 && totalSubs > ps.maxSubscribers {
atomic.AddInt32(&ps.numSubscribers, -1)
@ -120,40 +120,54 @@ func (ps *PubSub[T, M]) SubscribeJSON(mask M, subCh chan<- []byte, doneCh <-chan
combined := Mask(atomic.LoadUint64(&ps.types))
combined.Merge(Mask(mask.Mask()))
atomic.StoreUint64(&ps.types, uint64(combined))
if wg != nil {
wg.Add(1)
}
go func() {
defer func() {
if wg != nil {
wg.Done()
}
// Clean up and de-register the subscriber
ps.Lock()
defer ps.Unlock()
var remainTypes Mask
for i, s := range ps.subs {
if s == sub {
ps.subs = append(ps.subs[:i], ps.subs[i+1:]...)
} else {
remainTypes.Merge(s.types)
}
}
atomic.StoreUint64(&ps.types, uint64(remainTypes))
atomic.AddInt32(&ps.numSubscribers, -1)
}()
// Read from subChT and write to subCh
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for {
select {
case <-doneCh:
return
case v, ok := <-subChT:
if !ok {
break
return
}
buf.Reset()
err := enc.Encode(v)
if err != nil {
break
return
}
subCh <- append(GetByteBuffer()[:0], buf.Bytes()...)
continue
}
break
}
ps.Lock()
defer ps.Unlock()
var remainTypes Mask
for i, s := range ps.subs {
if s == sub {
ps.subs = append(ps.subs[:i], ps.subs[i+1:]...)
} else {
remainTypes.Merge(s.types)
select {
case subCh <- append(GetByteBuffer()[:0], buf.Bytes()...):
continue
case <-doneCh:
return
}
}
}
atomic.StoreUint64(&ps.types, uint64(remainTypes))
atomic.AddInt32(&ps.numSubscribers, -1)
}()
return nil