Improve tracing & notification scalability (#18903)

* Perform JSON encoding on remote machines and only forward byte slices.
* Migrate tracing & notification to WebSockets.
This commit is contained in:
Klaus Post 2024-01-30 12:49:02 -08:00 committed by GitHub
parent 80ca120088
commit 6da4a9c7bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 451 additions and 213 deletions

View File

@ -50,6 +50,7 @@ import (
"github.com/minio/madmin-go/v3/estream" "github.com/minio/madmin-go/v3/estream"
"github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/dsync" "github.com/minio/minio/internal/dsync"
"github.com/minio/minio/internal/grid"
"github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/handlers"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
@ -1909,14 +1910,11 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
setEventStreamHeaders(w) setEventStreamHeaders(w)
// Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers. // Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers.
// Use buffered channel to take care of burst sends or slow w.Write() // Keep 100k buffered channel.
// If receiver cannot keep up with that we drop events.
// Keep 100k buffered channel, should be sufficient to ensure we do not lose any events. traceCh := make(chan []byte, 100000)
traceCh := make(chan madmin.TraceInfo, 100000)
peers, _ := newPeerRestClients(globalEndpoints) peers, _ := newPeerRestClients(globalEndpoints)
err = globalTrace.SubscribeJSON(traceOpts.TraceTypes(), traceCh, ctx.Done(), func(entry madmin.TraceInfo) bool {
err = globalTrace.Subscribe(traceOpts.TraceTypes(), traceCh, ctx.Done(), func(entry madmin.TraceInfo) bool {
return shouldTrace(entry, traceOpts) return shouldTrace(entry, traceOpts)
}) })
if err != nil { if err != nil {
@ -1933,19 +1931,19 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
if peer == nil { if peer == nil {
continue continue
} }
peer.Trace(traceCh, ctx.Done(), traceOpts) peer.Trace(ctx, traceCh, traceOpts)
} }
keepAliveTicker := time.NewTicker(time.Second) keepAliveTicker := time.NewTicker(time.Second)
defer keepAliveTicker.Stop() defer keepAliveTicker.Stop()
enc := json.NewEncoder(w)
for { for {
select { select {
case entry := <-traceCh: case entry := <-traceCh:
if err := enc.Encode(entry); err != nil { if _, err := w.Write(entry); err != nil {
return return
} }
grid.PutByteBuffer(entry)
if len(traceCh) == 0 { if len(traceCh) == 0 {
// Flush if nothing is queued // Flush if nothing is queued
w.(http.Flusher).Flush() w.(http.Flusher).Flush()

View File

@ -411,6 +411,29 @@ func (l EndpointServerPools) GridHosts() (gridHosts []string, gridLocal string)
return gridHosts, gridLocal return gridHosts, gridLocal
} }
// FindGridHostsFromPeer will return a matching peer from provided peer.
func (l EndpointServerPools) FindGridHostsFromPeer(peer *xnet.Host) (peerGrid string) {
if peer == nil {
return ""
}
for _, ep := range l {
for _, endpoint := range ep.Endpoints {
if endpoint.IsLocal {
continue
}
host, err := xnet.ParseHost(endpoint.Host)
if err != nil {
continue
}
if host.String() == peer.String() {
return endpoint.GridHost()
}
}
}
return ""
}
// Hostnames - returns list of unique hostnames // Hostnames - returns list of unique hostnames
func (l EndpointServerPools) Hostnames() []string { func (l EndpointServerPools) Hostnames() []string {
foundSet := set.NewStringSet() foundSet := set.NewStringSet()

View File

@ -34,6 +34,7 @@ import (
"github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/config/browser" "github.com/minio/minio/internal/config/browser"
"github.com/minio/minio/internal/grid"
"github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/handlers"
"github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/kms"
"go.uber.org/atomic" "go.uber.org/atomic"
@ -130,6 +131,11 @@ const (
tlsClientSessionCacheSize = 100 tlsClientSessionCacheSize = 100
) )
func init() {
// Injected to prevent circular dependency.
pubsub.GetByteBuffer = grid.GetByteBuffer
}
type poolDisksLayout struct { type poolDisksLayout struct {
cmdline string cmdline string
layout [][]string layout [][]string

View File

@ -18,12 +18,14 @@
package cmd package cmd
import ( import (
"bytes"
"encoding/json" "encoding/json"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/grid"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/pubsub" "github.com/minio/minio/internal/pubsub"
"github.com/minio/mux" "github.com/minio/mux"
@ -116,11 +118,32 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
// Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers. // Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers.
// Use buffered channel to take care of burst sends or slow w.Write() // Use buffered channel to take care of burst sends or slow w.Write()
listenCh := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity()*len(globalEndpoints.Hostnames())) mergeCh := make(chan []byte, globalAPIConfig.getRequestsPoolCapacity()*len(globalEndpoints.Hostnames()))
localCh := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity())
// Convert local messages to JSON and send to mergeCh
go func() {
buf := bytes.NewBuffer(grid.GetByteBuffer()[:0])
enc := json.NewEncoder(buf)
tmpEvt := struct{ Records []event.Event }{[]event.Event{{}}}
for {
select {
case ev := <-localCh:
buf.Reset()
tmpEvt.Records[0] = ev
if err := enc.Encode(tmpEvt); err != nil {
logger.LogOnceIf(ctx, err, "event: Encode failed")
continue
}
mergeCh <- append(grid.GetByteBuffer()[:0], buf.Bytes()...)
case <-ctx.Done():
grid.PutByteBuffer(buf.Bytes())
return
}
}
}()
peers, _ := newPeerRestClients(globalEndpoints) peers, _ := newPeerRestClients(globalEndpoints)
err := globalHTTPListen.Subscribe(mask, localCh, ctx.Done(), func(ev event.Event) bool {
err := globalHTTPListen.Subscribe(mask, listenCh, ctx.Done(), func(ev event.Event) bool {
if ev.S3.Bucket.Name != "" && bucketName != "" { if ev.S3.Bucket.Name != "" && bucketName != "" {
if ev.S3.Bucket.Name != bucketName { if ev.S3.Bucket.Name != bucketName {
return false return false
@ -139,7 +162,7 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
if peer == nil { if peer == nil {
continue continue
} }
peer.Listen(listenCh, ctx.Done(), values) peer.Listen(ctx, mergeCh, values)
} }
var ( var (
@ -170,14 +193,16 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
for { for {
select { select {
case ev := <-listenCh: case ev := <-mergeCh:
if err := enc.Encode(struct{ Records []event.Event }{[]event.Event{ev}}); err != nil { _, err := w.Write(ev)
if err != nil {
return return
} }
if len(listenCh) == 0 { if len(mergeCh) == 0 {
// Flush if nothing is queued // Flush if nothing is queued
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
grid.PutByteBuffer(ev)
case <-emptyEventTicker: case <-emptyEventTicker:
if err := enc.Encode(struct{ Records []event.Event }{}); err != nil { if err := enc.Encode(struct{ Records []event.Event }{}); err != nil {
return return

View File

@ -22,19 +22,20 @@ import (
"context" "context"
"encoding/gob" "encoding/gob"
"encoding/hex" "encoding/hex"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time" "time"
"github.com/minio/madmin-go/v3" "github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/grid"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest" "github.com/minio/minio/internal/rest"
"github.com/minio/pkg/v2/logger/message/log" "github.com/minio/pkg/v2/logger/message/log"
@ -46,6 +47,66 @@ import (
type peerRESTClient struct { type peerRESTClient struct {
host *xnet.Host host *xnet.Host
restClient *rest.Client restClient *rest.Client
gridHost string
// Function that returns the grid connection for this peer when initialized.
// Will return nil if the grid connection is not initialized yet.
gridConn func() *grid.Connection
}
// Returns a peer rest client.
func newPeerRESTClient(peer *xnet.Host, gridHost string) *peerRESTClient {
scheme := "http"
if globalIsTLS {
scheme = "https"
}
serverURL := &url.URL{
Scheme: scheme,
Host: peer.String(),
Path: peerRESTPath,
}
restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
// Use a separate client to avoid recursive calls.
healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
healthClient.NoMetrics = true
// Construct a new health function.
restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(context.Background(), restClient.HealthCheckTimeout)
defer cancel()
respBody, err := healthClient.Call(ctx, peerRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody)
return !isNetworkError(err)
}
var gridConn atomic.Pointer[grid.Connection]
return &peerRESTClient{
host: peer, restClient: restClient, gridHost: gridHost,
gridConn: func() *grid.Connection {
// Lazy initialization of grid connection.
// When we create this peer client, the grid connection is likely not yet initialized.
if gridHost == "" {
logger.LogOnceIf(context.Background(), fmt.Errorf("gridHost is empty for peer %s", peer.String()), peer.String()+":gridHost")
return nil
}
gc := gridConn.Load()
if gc != nil {
return gc
}
gm := globalGrid.Load()
if gm == nil {
return nil
}
gc = gm.Connection(gridHost)
if gc == nil {
logger.LogOnceIf(context.Background(), fmt.Errorf("gridHost %q not found for peer %s", gridHost, peer.String()), peer.String()+":gridHost")
return nil
}
gridConn.Store(gc)
return gc
},
}
} }
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
@ -254,7 +315,7 @@ func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Me
go func(ch chan<- Metric) { go func(ch chan<- Metric) {
defer func() { defer func() {
xhttp.DrainBody(respBody) xhttp.DrainBody(respBody)
xioutil.SafeClose(ch) close(ch)
}() }()
for { for {
var metric Metric var metric Metric
@ -618,92 +679,61 @@ func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) erro
return nil return nil
} }
func (client *peerRESTClient) doTrace(traceCh chan<- madmin.TraceInfo, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) { func (client *peerRESTClient) doTrace(ctx context.Context, traceCh chan<- []byte, traceOpts madmin.ServiceTraceOpts) {
values := make(url.Values) gridConn := client.gridConn()
traceOpts.AddParams(values) if gridConn == nil {
// To cancel the REST request in case doneCh gets closed.
ctx, cancel := context.WithCancel(GlobalContext)
cancelCh := make(chan struct{})
defer xioutil.SafeClose(cancelCh)
go func() {
select {
case <-doneCh:
case <-cancelCh:
// There was an error in the REST request.
}
cancel()
}()
respBody, err := client.callWithContext(ctx, peerRESTMethodTrace, values, nil, -1)
defer xhttp.DrainBody(respBody)
if err != nil {
return return
} }
dec := gob.NewDecoder(respBody) payload, err := json.Marshal(traceOpts)
for { if err != nil {
var info madmin.TraceInfo logger.LogIf(ctx, err)
if err = dec.Decode(&info); err != nil { return
return
}
if len(info.NodeName) > 0 {
select {
case traceCh <- info:
default:
// Do not block on slow receivers.
}
}
} }
st, err := gridConn.NewStream(ctx, grid.HandlerTrace, payload)
if err != nil {
return
}
st.Results(func(b []byte) error {
select {
case traceCh <- b:
default:
// Do not block on slow receivers.
// Just recycle the buffer.
grid.PutByteBuffer(b)
}
return nil
})
} }
func (client *peerRESTClient) doListen(listenCh chan<- event.Event, doneCh <-chan struct{}, v url.Values) { func (client *peerRESTClient) doListen(ctx context.Context, listenCh chan<- []byte, v url.Values) {
// To cancel the REST request in case doneCh gets closed. conn := client.gridConn()
ctx, cancel := context.WithCancel(GlobalContext) if conn == nil {
return
cancelCh := make(chan struct{}) }
defer xioutil.SafeClose(cancelCh) st, err := listenHandler.Call(ctx, conn, grid.NewURLValuesWith(v))
go func() {
select {
case <-doneCh:
case <-cancelCh:
// There was an error in the REST request.
}
cancel()
}()
respBody, err := client.callWithContext(ctx, peerRESTMethodListen, v, nil, -1)
defer xhttp.DrainBody(respBody)
if err != nil { if err != nil {
return return
} }
st.Results(func(b *grid.Bytes) error {
dec := gob.NewDecoder(respBody) select {
for { case listenCh <- *b:
var ev event.Event default:
if err := dec.Decode(&ev); err != nil { // Do not block on slow receivers.
return b.Recycle()
} }
if len(ev.EventVersion) > 0 { return nil
select { })
case listenCh <- ev:
default:
// Do not block on slow receivers.
}
}
}
} }
// Listen - listen on peers. // Listen - listen on peers.
func (client *peerRESTClient) Listen(listenCh chan<- event.Event, doneCh <-chan struct{}, v url.Values) { func (client *peerRESTClient) Listen(ctx context.Context, listenCh chan<- []byte, v url.Values) {
go func() { go func() {
for { for {
client.doListen(listenCh, doneCh, v) client.doListen(ctx, listenCh, v)
select { select {
case <-doneCh: case <-ctx.Done():
return return
default: default:
// There was error in the REST request, retry after sometime as probably the peer is down. // There was error in the REST request, retry after sometime as probably the peer is down.
@ -714,12 +744,13 @@ func (client *peerRESTClient) Listen(listenCh chan<- event.Event, doneCh <-chan
} }
// Trace - send http trace request to peer nodes // Trace - send http trace request to peer nodes
func (client *peerRESTClient) Trace(traceCh chan<- madmin.TraceInfo, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) { func (client *peerRESTClient) Trace(ctx context.Context, traceCh chan<- []byte, traceOpts madmin.ServiceTraceOpts) {
go func() { go func() {
for { for {
client.doTrace(traceCh, doneCh, traceOpts) // Blocks until context is canceled or an error occurs.
client.doTrace(ctx, traceCh, traceOpts)
select { select {
case <-doneCh: case <-ctx.Done():
return return
default: default:
// There was error in the REST request, retry after sometime as probably the peer is down. // There was error in the REST request, retry after sometime as probably the peer is down.
@ -734,7 +765,7 @@ func (client *peerRESTClient) doConsoleLog(logCh chan log.Info, doneCh <-chan st
ctx, cancel := context.WithCancel(GlobalContext) ctx, cancel := context.WithCancel(GlobalContext)
cancelCh := make(chan struct{}) cancelCh := make(chan struct{})
defer xioutil.SafeClose(cancelCh) defer close(cancelCh)
go func() { go func() {
select { select {
case <-doneCh: case <-doneCh:
@ -789,6 +820,7 @@ func newPeerRestClients(endpoints EndpointServerPools) (remote, all []*peerRESTC
// Only useful in distributed setups // Only useful in distributed setups
return nil, nil return nil, nil
} }
hosts := endpoints.hostsSorted() hosts := endpoints.hostsSorted()
remote = make([]*peerRESTClient, 0, len(hosts)) remote = make([]*peerRESTClient, 0, len(hosts))
all = make([]*peerRESTClient, len(hosts)) all = make([]*peerRESTClient, len(hosts))
@ -796,7 +828,7 @@ func newPeerRestClients(endpoints EndpointServerPools) (remote, all []*peerRESTC
if host == nil { if host == nil {
continue continue
} }
all[i] = newPeerRESTClient(host) all[i] = newPeerRESTClient(host, endpoints.FindGridHostsFromPeer(host))
remote = append(remote, all[i]) remote = append(remote, all[i])
} }
if len(all) != len(remote)+1 { if len(all) != len(remote)+1 {
@ -805,36 +837,6 @@ func newPeerRestClients(endpoints EndpointServerPools) (remote, all []*peerRESTC
return remote, all return remote, all
} }
// Returns a peer rest client.
func newPeerRESTClient(peer *xnet.Host) *peerRESTClient {
scheme := "http"
if globalIsTLS {
scheme = "https"
}
serverURL := &url.URL{
Scheme: scheme,
Host: peer.String(),
Path: peerRESTPath,
}
restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
// Use a separate client to avoid recursive calls.
healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
healthClient.NoMetrics = true
// Construct a new health function.
restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(context.Background(), restClient.HealthCheckTimeout)
defer cancel()
respBody, err := healthClient.Call(ctx, peerRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody)
return !isNetworkError(err)
}
return &peerRESTClient{host: peer, restClient: restClient}
}
// MonitorBandwidth - send http trace request to peer nodes // MonitorBandwidth - send http trace request to peer nodes
func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []string) (*bandwidth.BucketBandwidthReport, error) { func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []string) (*bandwidth.BucketBandwidthReport, error) {
values := make(url.Values) values := make(url.Values)
@ -861,7 +863,7 @@ func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric
go func(ch chan<- Metric) { go func(ch chan<- Metric) {
defer func() { defer func() {
xhttp.DrainBody(respBody) xhttp.DrainBody(respBody)
xioutil.SafeClose(ch) close(ch)
}() }()
for { for {
var metric Metric var metric Metric
@ -888,7 +890,7 @@ func (client *peerRESTClient) GetPeerBucketMetrics(ctx context.Context) (<-chan
go func(ch chan<- Metric) { go func(ch chan<- Metric) {
defer func() { defer func() {
xhttp.DrainBody(respBody) xhttp.DrainBody(respBody)
xioutil.SafeClose(ch) close(ch)
}() }()
for { for {
var metric Metric var metric Metric
@ -1026,7 +1028,7 @@ func (client *peerRESTClient) GetReplicationMRF(ctx context.Context, bucket stri
go func(ch chan madmin.ReplicationMRF) { go func(ch chan madmin.ReplicationMRF) {
defer func() { defer func() {
xhttp.DrainBody(respBody) xhttp.DrainBody(respBody)
xioutil.SafeClose(ch) close(ch)
}() }()
for { for {
var entry madmin.ReplicationMRF var entry madmin.ReplicationMRF

View File

@ -18,6 +18,7 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"encoding/gob" "encoding/gob"
"encoding/hex" "encoding/hex"
@ -37,6 +38,7 @@ import (
"github.com/minio/madmin-go/v3" "github.com/minio/madmin-go/v3"
b "github.com/minio/minio/internal/bucket/bandwidth" b "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/grid"
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/pubsub" "github.com/minio/minio/internal/pubsub"
@ -976,25 +978,23 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req
} }
} }
// Set an output capacity of 100 for listenHandler
// There is another buffer that will buffer events.
var listenHandler = grid.NewStream[*grid.URLValues, grid.NoPayload, *grid.Bytes](grid.HandlerListen,
grid.NewURLValues, nil, grid.NewBytes).WithOutCapacity(100)
// ListenHandler sends http trace messages back to peer rest client // ListenHandler sends http trace messages back to peer rest client
func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { func (s *peerRESTServer) ListenHandler(ctx context.Context, v *grid.URLValues, out chan<- *grid.Bytes) *grid.RemoteErr {
if !s.IsValid(w, r) { values := v.Values()
s.writeErrorResponse(w, errors.New("invalid request")) defer v.Recycle()
return
}
values := r.Form
var prefix string var prefix string
if len(values[peerRESTListenPrefix]) > 1 { if len(values[peerRESTListenPrefix]) > 1 {
s.writeErrorResponse(w, errors.New("invalid request")) return grid.NewRemoteErrString("invalid request (peerRESTListenPrefix)")
return
} }
globalAPIConfig.getRequestsPoolCapacity()
if len(values[peerRESTListenPrefix]) == 1 { if len(values[peerRESTListenPrefix]) == 1 {
if err := event.ValidateFilterRuleValue(values[peerRESTListenPrefix][0]); err != nil { if err := event.ValidateFilterRuleValue(values[peerRESTListenPrefix][0]); err != nil {
s.writeErrorResponse(w, err) return grid.NewRemoteErr(err)
return
} }
prefix = values[peerRESTListenPrefix][0] prefix = values[peerRESTListenPrefix][0]
@ -1002,14 +1002,12 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
var suffix string var suffix string
if len(values[peerRESTListenSuffix]) > 1 { if len(values[peerRESTListenSuffix]) > 1 {
s.writeErrorResponse(w, errors.New("invalid request")) return grid.NewRemoteErrString("invalid request (peerRESTListenSuffix)")
return
} }
if len(values[peerRESTListenSuffix]) == 1 { if len(values[peerRESTListenSuffix]) == 1 {
if err := event.ValidateFilterRuleValue(values[peerRESTListenSuffix][0]); err != nil { if err := event.ValidateFilterRuleValue(values[peerRESTListenSuffix][0]); err != nil {
s.writeErrorResponse(w, err) return grid.NewRemoteErr(err)
return
} }
suffix = values[peerRESTListenSuffix][0] suffix = values[peerRESTListenSuffix][0]
@ -1022,8 +1020,7 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
for _, ev := range values[peerRESTListenEvents] { for _, ev := range values[peerRESTListenEvents] {
eventName, err := event.ParseName(ev) eventName, err := event.ParseName(ev)
if err != nil { if err != nil {
s.writeErrorResponse(w, err) return grid.NewRemoteErr(err)
return
} }
mask.MergeMaskable(eventName) mask.MergeMaskable(eventName)
eventNames = append(eventNames, eventName) eventNames = append(eventNames, eventName)
@ -1031,13 +1028,10 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()}) rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()})
doneCh := r.Context().Done()
// Listen Publisher uses nonblocking publish and hence does not wait for slow subscribers. // Listen Publisher uses nonblocking publish and hence does not wait for slow subscribers.
// Use buffered channel to take care of burst sends or slow w.Write() // Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity()) ch := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity())
err := globalHTTPListen.Subscribe(mask, ch, ctx.Done(), func(ev event.Event) bool {
err := globalHTTPListen.Subscribe(mask, ch, doneCh, func(ev event.Event) bool {
if ev.S3.Bucket.Name != "" && values.Get(peerRESTListenBucket) != "" { if ev.S3.Bucket.Name != "" && values.Get(peerRESTListenBucket) != "" {
if ev.S3.Bucket.Name != values.Get(peerRESTListenBucket) { if ev.S3.Bucket.Name != values.Get(peerRESTListenBucket) {
return false return false
@ -1046,83 +1040,56 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key) return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key)
}) })
if err != nil { if err != nil {
s.writeErrorResponse(w, err) return grid.NewRemoteErr(err)
return
} }
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
enc := gob.NewEncoder(w) // Process until remote disconnects.
// Blocks on upstream (out) congestion.
// We have however a dynamic downstream buffer (ch).
buf := bytes.NewBuffer(grid.GetByteBuffer())
enc := json.NewEncoder(buf)
tmpEvt := struct{ Records []event.Event }{[]event.Event{{}}}
for { for {
select { select {
case <-ctx.Done():
grid.PutByteBuffer(buf.Bytes())
return nil
case ev := <-ch: case ev := <-ch:
if err := enc.Encode(ev); err != nil { buf.Reset()
return tmpEvt.Records[0] = ev
if err := enc.Encode(tmpEvt); err != nil {
logger.LogOnceIf(ctx, err, "event: Encode failed")
continue
} }
if len(ch) == 0 { out <- grid.NewBytesWith(append(grid.GetByteBuffer()[:0], buf.Bytes()...))
// Flush if nothing is queued
w.(http.Flusher).Flush()
}
case <-r.Context().Done():
return
case <-keepAliveTicker.C:
if err := enc.Encode(&event.Event{}); err != nil {
return
}
w.(http.Flusher).Flush()
} }
} }
} }
// TraceHandler sends http trace messages back to peer rest client // TraceHandler sends http trace messages back to peer rest client
func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) { func (s *peerRESTServer) TraceHandler(ctx context.Context, payload []byte, _ <-chan []byte, out chan<- []byte) *grid.RemoteErr {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
var traceOpts madmin.ServiceTraceOpts var traceOpts madmin.ServiceTraceOpts
err := traceOpts.ParseParams(r) err := json.Unmarshal(payload, &traceOpts)
if err != nil { if err != nil {
s.writeErrorResponse(w, errors.New("Invalid request")) return grid.NewRemoteErr(err)
return
} }
// Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers. // Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers.
// Use buffered channel to take care of burst sends or slow w.Write() // Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan madmin.TraceInfo, 100000) err = globalTrace.SubscribeJSON(traceOpts.TraceTypes(), out, ctx.Done(), func(entry madmin.TraceInfo) bool {
err = globalTrace.Subscribe(traceOpts.TraceTypes(), ch, r.Context().Done(), func(entry madmin.TraceInfo) bool {
return shouldTrace(entry, traceOpts) return shouldTrace(entry, traceOpts)
}) })
if err != nil { if err != nil {
s.writeErrorResponse(w, err) return grid.NewRemoteErr(err)
return
} }
// Publish bootstrap events that have already occurred before client could subscribe. // Publish bootstrap events that have already occurred before client could subscribe.
if traceOpts.TraceTypes().Contains(madmin.TraceBootstrap) { if traceOpts.TraceTypes().Contains(madmin.TraceBootstrap) {
go globalBootstrapTracer.Publish(r.Context(), globalTrace) go globalBootstrapTracer.Publish(ctx, globalTrace)
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
enc := gob.NewEncoder(w)
for {
select {
case entry := <-ch:
if err := enc.Encode(entry); err != nil {
return
}
case <-r.Context().Done():
return
case <-keepAliveTicker.C:
if err := enc.Encode(&madmin.TraceInfo{}); err != nil {
return
}
w.(http.Flusher).Flush()
}
} }
// Wait for remote to cancel.
<-ctx.Done()
return nil
} }
func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) { func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) {
@ -1522,7 +1489,7 @@ func (s *peerRESTServer) NetSpeedTestHandler(w http.ResponseWriter, r *http.Requ
} }
// registerPeerRESTHandlers - register peer rest router. // registerPeerRESTHandlers - register peer rest router.
func registerPeerRESTHandlers(router *mux.Router) { func registerPeerRESTHandlers(router *mux.Router, gm *grid.Manager) {
h := func(f http.HandlerFunc) http.HandlerFunc { h := func(f http.HandlerFunc) http.HandlerFunc {
return collectInternodeStats(httpTraceHdrs(f)) return collectInternodeStats(httpTraceHdrs(f))
} }
@ -1564,8 +1531,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(h(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(h(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(h(server.DownloadProfilingDataHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(h(server.DownloadProfilingDataHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(h(server.ListenHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocalDiskIDs).HandlerFunc(h(server.GetLocalDiskIDs)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocalDiskIDs).HandlerFunc(h(server.GetLocalDiskIDs))
@ -1584,4 +1549,11 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadRebalanceMeta).HandlerFunc(h(server.LoadRebalanceMetaHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadRebalanceMeta).HandlerFunc(h(server.LoadRebalanceMetaHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStopRebalance).HandlerFunc(h(server.StopRebalanceHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStopRebalance).HandlerFunc(h(server.StopRebalanceHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(h(server.GetLastDayTierStatsHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(h(server.GetLastDayTierStatsHandler))
logger.FatalIf(listenHandler.RegisterNoInput(gm, server.ListenHandler), "unable to register handler")
logger.FatalIf(gm.RegisterStreamingHandler(grid.HandlerTrace, grid.StreamHandler{
Handle: server.TraceHandler,
Subroute: "",
OutCapacity: 100000,
InCapacity: 0,
}), "unable to register handler")
} }

View File

@ -30,7 +30,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerPools Endpoint
registerStorageRESTHandlers(router, endpointServerPools, globalGrid.Load()) registerStorageRESTHandlers(router, endpointServerPools, globalGrid.Load())
// Register peer REST router only if its a distributed setup. // Register peer REST router only if its a distributed setup.
registerPeerRESTHandlers(router) registerPeerRESTHandlers(router, globalGrid.Load())
// Register peer S3 router only if its a distributed setup. // Register peer S3 router only if its a distributed setup.
registerPeerS3Handlers(router) registerPeerS3Handlers(router)

View File

@ -59,8 +59,10 @@ const (
HandlerRenameData HandlerRenameData
HandlerRenameFile HandlerRenameFile
HandlerReadAll HandlerReadAll
HandlerServerVerify HandlerServerVerify
HandlerTrace
HandlerListen
// Add more above here ^^^ // Add more above here ^^^
// If all handlers are used, the type of Handler can be changed. // If all handlers are used, the type of Handler can be changed.
// Handlers have no versioning, so non-compatible handler changes must result in new IDs. // Handlers have no versioning, so non-compatible handler changes must result in new IDs.
@ -542,6 +544,20 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Register(m *Manager, handle func
return h.register(m, handle, subroute...) return h.register(m, handle, subroute...)
} }
// WithOutCapacity adjusts the output capacity from the handler perspective.
// This must be done prior to registering the handler.
func (h *StreamTypeHandler[Payload, Req, Resp]) WithOutCapacity(out int) *StreamTypeHandler[Payload, Req, Resp] {
h.OutCapacity = out
return h
}
// WithInCapacity adjusts the input capacity from the handler perspective.
// This must be done prior to registering the handler.
func (h *StreamTypeHandler[Payload, Req, Resp]) WithInCapacity(in int) *StreamTypeHandler[Payload, Req, Resp] {
h.InCapacity = in
return h
}
// RegisterNoInput a handler for one-way streaming with payload and output stream. // RegisterNoInput a handler for one-way streaming with payload and output stream.
// An optional subroute can be given. Multiple entries are joined with '/'. // An optional subroute can be given. Multiple entries are joined with '/'.
func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput(m *Manager, handle func(ctx context.Context, p Payload, out chan<- Resp) *RemoteErr, subroute ...string) error { func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput(m *Manager, handle func(ctx context.Context, p Payload, out chan<- Resp) *RemoteErr, subroute ...string) error {

View File

@ -30,14 +30,16 @@ func _() {
_ = x[HandlerRenameFile-19] _ = x[HandlerRenameFile-19]
_ = x[HandlerReadAll-20] _ = x[HandlerReadAll-20]
_ = x[HandlerServerVerify-21] _ = x[HandlerServerVerify-21]
_ = x[handlerTest-22] _ = x[HandlerTrace-22]
_ = x[handlerTest2-23] _ = x[HandlerListen-23]
_ = x[handlerLast-24] _ = x[handlerTest-24]
_ = x[handlerTest2-25]
_ = x[handlerLast-26]
} }
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyhandlerTesthandlerTest2handlerLast" const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenhandlerTesthandlerTest2handlerLast"
var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 236, 248, 259} var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 247, 259, 270}
func (i HandlerID) String() string { func (i HandlerID) String() string {
if i >= HandlerID(len(_HandlerID_index)-1) { if i >= HandlerID(len(_HandlerID_index)-1) {

View File

@ -22,6 +22,7 @@ import (
"net/url" "net/url"
"sort" "sort"
"strings" "strings"
"sync"
"github.com/tinylib/msgp/msgp" "github.com/tinylib/msgp/msgp"
) )
@ -198,3 +199,133 @@ func (b *Bytes) Msgsize() int {
} }
return msgp.ArrayHeaderSize + len(*b) return msgp.ArrayHeaderSize + len(*b)
} }
// Recycle puts the Bytes back into the pool.
func (b *Bytes) Recycle() {
if *b != nil {
PutByteBuffer(*b)
*b = nil
}
}
// URLValues can be used for url.Values.
type URLValues map[string][]string
var urlValuesPool = sync.Pool{
New: func() interface{} {
return make(map[string][]string, 10)
},
}
// NewURLValues returns a new URLValues.
func NewURLValues() *URLValues {
u := URLValues(urlValuesPool.Get().(map[string][]string))
return &u
}
// NewURLValuesWith returns a new URLValues with the provided content.
func NewURLValuesWith(values map[string][]string) *URLValues {
u := URLValues(values)
return &u
}
// Values returns the url.Values.
// If u is nil, an empty url.Values is returned.
// The values are a shallow copy of the underlying map.
func (u *URLValues) Values() url.Values {
if u == nil {
return url.Values{}
}
return url.Values(*u)
}
// Recycle the underlying map.
func (u *URLValues) Recycle() {
if *u != nil {
for key := range *u {
delete(*u, key)
}
val := map[string][]string(*u)
urlValuesPool.Put(val)
*u = nil
}
}
// MarshalMsg implements msgp.Marshaler
func (u URLValues) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, u.Msgsize())
o = msgp.AppendMapHeader(o, uint32(len(u)))
for zb0006, zb0007 := range u {
o = msgp.AppendString(o, zb0006)
o = msgp.AppendArrayHeader(o, uint32(len(zb0007)))
for zb0008 := range zb0007 {
o = msgp.AppendString(o, zb0007[zb0008])
}
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (u *URLValues) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0004 uint32
zb0004, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if *u == nil {
*u = urlValuesPool.Get().(map[string][]string)
}
if len(*u) > 0 {
for key := range *u {
delete(*u, key)
}
}
for zb0004 > 0 {
var zb0001 string
var zb0002 []string
zb0004--
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
var zb0005 uint32
zb0005, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
if cap(zb0002) >= int(zb0005) {
zb0002 = zb0002[:zb0005]
} else {
zb0002 = make([]string, zb0005)
}
for zb0003 := range zb0002 {
zb0002[zb0003], bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, zb0001, zb0003)
return
}
}
(*u)[zb0001] = zb0002
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (u URLValues) Msgsize() (s int) {
s = msgp.MapHeaderSize
if u != nil {
for zb0006, zb0007 := range u {
_ = zb0007
s += msgp.StringPrefixSize + len(zb0006) + msgp.ArrayHeaderSize
for zb0008 := range zb0007 {
s += msgp.StringPrefixSize + len(zb0007[zb0008])
}
}
}
return
}

View File

@ -18,11 +18,18 @@
package pubsub package pubsub
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
) )
// GetByteBuffer returns a byte buffer from the pool.
var GetByteBuffer = func() []byte {
return make([]byte, 0, 4096)
}
// Sub - subscriber entity. // Sub - subscriber entity.
type Sub[T Maskable] struct { type Sub[T Maskable] struct {
ch chan T ch chan T
@ -96,6 +103,62 @@ func (ps *PubSub[T, M]) Subscribe(mask M, subCh chan T, doneCh <-chan struct{},
return nil return nil
} }
// SubscribeJSON - Adds a subscriber to pubsub system and returns results with JSON encoding.
func (ps *PubSub[T, M]) SubscribeJSON(mask M, subCh chan<- []byte, doneCh <-chan struct{}, filter func(entry T) bool) error {
totalSubs := atomic.AddInt32(&ps.numSubscribers, 1)
if ps.maxSubscribers > 0 && totalSubs > ps.maxSubscribers {
atomic.AddInt32(&ps.numSubscribers, -1)
return fmt.Errorf("the limit of `%d` subscribers is reached", ps.maxSubscribers)
}
ps.Lock()
defer ps.Unlock()
subChT := make(chan T, 10000)
sub := &Sub[T]{ch: subChT, types: Mask(mask.Mask()), filter: filter}
ps.subs = append(ps.subs, sub)
// We hold a lock, so we are safe to update
combined := Mask(atomic.LoadUint64(&ps.types))
combined.Merge(Mask(mask.Mask()))
atomic.StoreUint64(&ps.types, uint64(combined))
go func() {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for {
select {
case <-doneCh:
case v, ok := <-subChT:
if !ok {
break
}
buf.Reset()
err := enc.Encode(v)
if err != nil {
break
}
subCh <- append(GetByteBuffer()[:0], buf.Bytes()...)
continue
}
break
}
ps.Lock()
defer ps.Unlock()
var remainTypes Mask
for i, s := range ps.subs {
if s == sub {
ps.subs = append(ps.subs[:i], ps.subs[i+1:]...)
} else {
remainTypes.Merge(s.types)
}
}
atomic.StoreUint64(&ps.types, uint64(remainTypes))
atomic.AddInt32(&ps.numSubscribers, -1)
}()
return nil
}
// NumSubscribers returns the number of current subscribers, // NumSubscribers returns the number of current subscribers,
// The mask is checked against the active subscribed types, // The mask is checked against the active subscribed types,
// and 0 will be returned if nobody is subscribed for the type(s). // and 0 will be returned if nobody is subscribed for the type(s).