Improve object reuse for grid messages (#18940)

Allow internal types to support a `Recycler` interface, which will allow for sharing of common types across handlers.

This means that all `grid.MSS` (and similar) objects are shared across in a common pool instead of a per-handler pool.

Add internal request reuse of internal types. Add for safe (pointerless) types explicitly.

Only log params for internal types. Doing Sprint(obj) is just a bit too messy.
This commit is contained in:
Klaus Post 2024-02-01 12:41:20 -08:00 committed by GitHub
parent 6440d0fbf3
commit b192bc348c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 256 additions and 76 deletions

View File

@ -152,10 +152,12 @@ func (client *bootstrapRESTClient) Verify(ctx context.Context, srcCfg *ServerSys
return nil
}
recvCfg, err := serverVerifyHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{}))
recvCfg, err := serverVerifyHandler.Call(ctx, client.gridConn, grid.NewMSS())
if err != nil {
return err
}
// We do not need the response after returning.
defer serverVerifyHandler.PutResponse(recvCfg)
return srcCfg.Diff(recvCfg)
}

View File

@ -981,7 +981,7 @@ func (s *peerRESTServer) ListenHandler(ctx context.Context, v *grid.URLValues, o
logger.LogOnceIf(ctx, err, "event: Encode failed")
continue
}
out <- grid.NewBytesWith(append(grid.GetByteBuffer()[:0], buf.Bytes()...))
out <- grid.NewBytesWithCopyOf(buf.Bytes())
}
}
}

View File

@ -346,6 +346,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo
return vol, toStorageErr(err)
}
vol = *v
// Performs shallow copy, so we can reuse.
storageStatVolHandler.PutResponse(v)
return vol, nil
}
@ -455,6 +456,7 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
if err != nil {
return 0, toStorageErr(err)
}
defer storageRenameDataHandler.PutResponse(resp)
return resp.Signature, nil
}

View File

@ -201,7 +201,7 @@ func (s *storageRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request
// DiskInfo types.
// DiskInfo.Metrics elements are shared, so we cannot reuse.
var storageDiskInfoHandler = grid.NewSingleHandler[*DiskInfoOptions, *DiskInfo](grid.HandlerDiskInfo, func() *DiskInfoOptions { return &DiskInfoOptions{} },
func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse()
func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse().AllowCallRequestPool(true)
// DiskInfoHandler - returns disk info.
func (s *storageRESTServer) DiskInfoHandler(opts *DiskInfoOptions) (*DiskInfo, *grid.RemoteErr) {
@ -495,7 +495,7 @@ func (s *storageRESTServer) CheckPartsHandler(p *CheckPartsHandlerParams) (grid.
var storageReadAllHandler = grid.NewSingleHandler[*ReadAllHandlerParams, *grid.Bytes](grid.HandlerReadAll, func() *ReadAllHandlerParams {
return &ReadAllHandlerParams{}
}, grid.NewBytes)
}, grid.NewBytes).AllowCallRequestPool(true)
// ReadAllHandler - read all the contents of a file.
func (s *storageRESTServer) ReadAllHandler(p *ReadAllHandlerParams) (*grid.Bytes, *grid.RemoteErr) {
@ -673,7 +673,7 @@ func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Reques
var storageDeleteFileHandler = grid.NewSingleHandler[*DeleteFileHandlerParams, grid.NoPayload](grid.HandlerDeleteFile, func() *DeleteFileHandlerParams {
return &DeleteFileHandlerParams{}
}, grid.NewNoPayload)
}, grid.NewNoPayload).AllowCallRequestPool(true)
// DeleteFileHandler - delete a file.
func (s *storageRESTServer) DeleteFileHandler(p *DeleteFileHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
@ -751,7 +751,7 @@ func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*Rena
var storageRenameFileHandler = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams {
return &RenameFileHandlerParams{}
}, grid.NewNoPayload)
}, grid.NewNoPayload).AllowCallRequestPool(true)
// RenameFileHandler - rename a file from source to destination
func (s *storageRESTServer) RenameFileHandler(p *RenameFileHandlerParams) (grid.NoPayload, *grid.RemoteErr) {

View File

@ -231,6 +231,7 @@ func TestSingleRoundtripGenerics(t *testing.T) {
t.Errorf("want %q, got %q", testPayload, resp.OrgString)
}
t.Log("Roundtrip:", time.Since(start))
h1.PutResponse(resp)
start = time.Now()
resp, err = h2.Call(context.Background(), remoteConn, &testRequest{Num: 1, String: testPayload})
@ -241,9 +242,74 @@ func TestSingleRoundtripGenerics(t *testing.T) {
if resp != nil {
t.Errorf("want nil, got %q", resp)
}
h2.PutResponse(resp)
t.Log("Roundtrip:", time.Since(start))
}
func TestSingleRoundtripGenericsRecycle(t *testing.T) {
defer testlogger.T.SetLogTB(t)()
errFatal := func(err error) {
t.Helper()
if err != nil {
t.Fatal(err)
}
}
grid, err := SetupTestGrid(2)
errFatal(err)
remoteHost := grid.Hosts[1]
local := grid.Managers[0]
remote := grid.Managers[1]
// 1: Echo
h1 := NewSingleHandler[*MSS, *MSS](handlerTest, NewMSS, NewMSS)
// Handles incoming requests, returns a response
handler1 := func(req *MSS) (resp *MSS, err *RemoteErr) {
resp = h1.NewResponse()
for k, v := range *req {
(*resp)[k] = v
}
return resp, nil
}
// Return error
h2 := NewSingleHandler[*MSS, *MSS](handlerTest2, NewMSS, NewMSS)
handler2 := func(req *MSS) (resp *MSS, err *RemoteErr) {
defer req.Recycle()
r := RemoteErr(req.Get("err"))
return nil, &r
}
errFatal(h1.Register(local, handler1))
errFatal(h2.Register(local, handler2))
errFatal(h1.Register(remote, handler1))
errFatal(h2.Register(remote, handler2))
// local to remote connection
remoteConn := local.Connection(remoteHost)
const testPayload = "Hello Grid World!"
start := time.Now()
req := NewMSSWith(map[string]string{"test": testPayload})
resp, err := h1.Call(context.Background(), remoteConn, req)
errFatal(err)
if resp.Get("test") != testPayload {
t.Errorf("want %q, got %q", testPayload, resp.Get("test"))
}
t.Log("Roundtrip:", time.Since(start))
h1.PutResponse(resp)
start = time.Now()
resp, err = h2.Call(context.Background(), remoteConn, NewMSSWith(map[string]string{"err": testPayload}))
t.Log("Roundtrip:", time.Since(start))
if err != RemoteErr(testPayload) {
t.Errorf("want error %v(%T), got %v(%T)", RemoteErr(testPayload), RemoteErr(testPayload), err, err)
}
if resp != nil {
t.Errorf("want nil, got %q", resp)
}
t.Log("Roundtrip:", time.Since(start))
h2.PutResponse(resp)
}
func TestStreamSuite(t *testing.T) {
defer testlogger.T.SetErrorTB(t)()
errFatal := func(err error) {

View File

@ -335,14 +335,36 @@ type RoundTripper interface {
// SingleHandler is a type safe handler for single roundtrip requests.
type SingleHandler[Req, Resp RoundTripper] struct {
id HandlerID
sharedResponse bool
id HandlerID
sharedResp bool
callReuseReq bool
reqPool sync.Pool
respPool sync.Pool
newReq func() Req
newResp func() Resp
nilReq Req
nilResp Resp
recycleReq func(Req)
recycleResp func(Resp)
}
func recycleFunc[RT RoundTripper](newRT func() RT) (newFn func() RT, recycle func(r RT)) {
rAny := any(newRT())
if rc, ok := rAny.(Recycler); ok {
return newRT, func(r RT) {
rc.Recycle()
}
}
pool := sync.Pool{
New: func() interface{} {
return newRT()
},
}
var rZero RT
return func() RT { return pool.Get().(RT) },
func(r RT) {
if r != rZero {
pool.Put(r)
}
}
}
// NewSingleHandler creates a typed handler that can provide Marshal/Unmarshal.
@ -350,27 +372,34 @@ type SingleHandler[Req, Resp RoundTripper] struct {
// Use Call to initiate a clientside call.
func NewSingleHandler[Req, Resp RoundTripper](h HandlerID, newReq func() Req, newResp func() Resp) *SingleHandler[Req, Resp] {
s := SingleHandler[Req, Resp]{id: h}
s.reqPool.New = func() interface{} {
return newReq()
}
s.respPool.New = func() interface{} {
return newResp()
s.newReq, s.recycleReq = recycleFunc[Req](newReq)
s.newResp, s.recycleResp = recycleFunc[Resp](newResp)
if _, ok := any(newReq()).(Recycler); ok {
s.callReuseReq = true
}
return &s
}
// PutResponse will accept a response for reuse.
// These should be returned by the caller.
// This can be used by a caller to recycle a response after receiving it from a Call.
func (h *SingleHandler[Req, Resp]) PutResponse(r Resp) {
if r != h.nilResp {
h.respPool.Put(r)
}
h.recycleResp(r)
}
// WithSharedResponse indicates it is unsafe to reuse the response.
// AllowCallRequestPool indicates it is safe to reuse the request
// on the client side, meaning the request is recycled/pooled when a request is sent.
// CAREFUL: This should only be used when there are no pointers, slices that aren't freshly constructed.
func (h *SingleHandler[Req, Resp]) AllowCallRequestPool(b bool) *SingleHandler[Req, Resp] {
h.callReuseReq = b
return h
}
// WithSharedResponse indicates it is unsafe to reuse the response
// when it has been returned on a handler.
// This will disable automatic response recycling/pooling.
// Typically this is used when the response sharing part of its data structure.
func (h *SingleHandler[Req, Resp]) WithSharedResponse() *SingleHandler[Req, Resp] {
h.sharedResponse = true
h.sharedResp = true
return h
}
@ -378,26 +407,25 @@ func (h *SingleHandler[Req, Resp]) WithSharedResponse() *SingleHandler[Req, Resp
// Handlers can use this to create a reusable response.
// The response may be reused, so caller should clear any fields.
func (h *SingleHandler[Req, Resp]) NewResponse() Resp {
return h.respPool.Get().(Resp)
}
// putRequest will accept a request for reuse.
// This is not exported, since it shouldn't be needed.
func (h *SingleHandler[Req, Resp]) putRequest(r Req) {
if r != h.nilReq {
h.reqPool.Put(r)
}
return h.newResp()
}
// NewRequest creates a new request.
// Handlers can use this to create a reusable request.
// The request may be reused, so caller should clear any fields.
func (h *SingleHandler[Req, Resp]) NewRequest() Req {
return h.reqPool.Get().(Req)
return h.newReq()
}
// Register a handler for a Req -> Resp roundtrip.
// Requests are automatically recycled.
func (h *SingleHandler[Req, Resp]) Register(m *Manager, handle func(req Req) (resp Resp, err *RemoteErr), subroute ...string) error {
if h.newReq == nil {
return errors.New("newReq nil in NewSingleHandler")
}
if h.newResp == nil {
return errors.New("newResp nil in NewSingleHandler")
}
return m.RegisterSingleHandler(h.id, func(payload []byte) ([]byte, *RemoteErr) {
req := h.NewRequest()
_, err := req.UnmarshalMsg(payload)
@ -407,13 +435,13 @@ func (h *SingleHandler[Req, Resp]) Register(m *Manager, handle func(req Req) (re
return nil, &r
}
resp, rerr := handle(req)
h.putRequest(req)
h.recycleReq(req)
if rerr != nil {
PutByteBuffer(payload)
return nil, rerr
}
payload, err = resp.MarshalMsg(payload[:0])
if !h.sharedResponse {
if !h.sharedResp {
h.PutResponse(resp)
}
if err != nil {
@ -438,7 +466,18 @@ func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Re
if err != nil {
return resp, err
}
ctx = context.WithValue(ctx, TraceParamsKey{}, req)
switch any(req).(type) {
case *MSS, *Bytes, *URLValues:
ctx = context.WithValue(ctx, TraceParamsKey{}, req)
case *NoPayload:
default:
ctx = context.WithValue(ctx, TraceParamsKey{}, fmt.Sprintf("type=%T", req))
}
if h.callReuseReq {
defer func() {
h.recycleReq(req)
}()
}
res, err := c.Request(ctx, h.id, payload)
PutByteBuffer(payload)
if err != nil {
@ -764,26 +803,3 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre
return &TypedStream[Req, Resp]{responses: stream, newResp: h.NewResponse, Requests: reqT}, nil
}
// NoPayload is a type that can be used for handlers that do not use a payload.
type NoPayload struct{}
// Msgsize returns 0.
func (p NoPayload) Msgsize() int {
return 0
}
// UnmarshalMsg satisfies the interface, but is a no-op.
func (NoPayload) UnmarshalMsg(bytes []byte) ([]byte, error) {
return bytes, nil
}
// MarshalMsg satisfies the interface, but is a no-op.
func (NoPayload) MarshalMsg(bytes []byte) ([]byte, error) {
return bytes, nil
}
// NewNoPayload returns an empty NoPayload struct.
func NewNoPayload() NoPayload {
return NoPayload{}
}

View File

@ -21,7 +21,6 @@ import (
"context"
"fmt"
"net/http"
"net/url"
"strings"
"time"
@ -131,22 +130,26 @@ func (c *muxClient) traceRoundtrip(ctx context.Context, t *tracer, h HandlerID,
}
// If the context contains a TraceParamsKey, add it to the trace path.
v := ctx.Value(TraceParamsKey{})
if p, ok := v.(*MSS); ok && p != nil {
trace.Path += p.ToQuery()
trace.HTTP.ReqInfo.Path = trace.Path
} else if p, ok := v.(map[string]string); ok {
m := MSS(p)
// Should match SingleHandler.Call checks.
switch typed := v.(type) {
case *MSS:
trace.Path += typed.ToQuery()
case map[string]string:
m := MSS(typed)
trace.Path += m.ToQuery()
trace.HTTP.ReqInfo.Path = trace.Path
} else if v != nil {
// Print exported fields as single request to path.
obj := fmt.Sprintf("%+v", v)
if len(obj) > 1024 {
obj = obj[:1024] + "..."
case *URLValues:
trace.Path += typed.Values().Encode()
case *NoPayload:
case *Bytes:
if typed != nil {
trace.Path = fmt.Sprintf("%s?bytes=%d", trace.Path, len(*typed))
}
trace.Path = fmt.Sprintf("%s?req=%s", trace.Path, url.QueryEscape(obj))
trace.HTTP.ReqInfo.Path = trace.Path
case string:
trace.Path = fmt.Sprintf("%s?%s", trace.Path, typed)
default:
}
trace.HTTP.ReqInfo.Path = trace.Path
t.Publisher.Publish(trace)
return resp, err
}

View File

@ -27,6 +27,14 @@ import (
"github.com/tinylib/msgp/msgp"
)
// Recycler will override the internal reuse in typed handlers.
// When this is supported, the handler will not do internal pooling of objects,
// call Recycle() when the object is no longer needed.
// The recycler should handle nil pointers.
type Recycler interface {
Recycle()
}
// MSS is a map[string]string that can be serialized.
// It is not very efficient, but it is only used for easy parameter passing.
type MSS map[string]string
@ -39,6 +47,14 @@ func (m *MSS) Get(key string) string {
return (*m)[key]
}
// Set a key, value pair.
func (m *MSS) Set(key, value string) {
if m == nil {
*m = mssPool.Get().(map[string]string)
}
(*m)[key] = value
}
// UnmarshalMsg deserializes m from the provided byte slice and returns the
// remainder of bytes.
func (m *MSS) UnmarshalMsg(bts []byte) (o []byte, err error) {
@ -111,7 +127,10 @@ func (m *MSS) Msgsize() int {
// NewMSS returns a new MSS.
func NewMSS() *MSS {
m := MSS(make(map[string]string))
m := MSS(mssPool.Get().(map[string]string))
for k := range m {
delete(m, k)
}
return &m
}
@ -121,6 +140,20 @@ func NewMSSWith(m map[string]string) *MSS {
return &m2
}
var mssPool = sync.Pool{
New: func() interface{} {
return make(map[string]string, 5)
},
}
// Recycle the underlying map.
func (m *MSS) Recycle() {
if m != nil && *m != nil {
mssPool.Put(map[string]string(*m))
*m = nil
}
}
// ToQuery constructs a URL query string from the MSS, including "?" if there are any keys.
func (m MSS) ToQuery() string {
if len(m) == 0 {
@ -147,17 +180,36 @@ func (m MSS) ToQuery() string {
}
// NewBytes returns a new Bytes.
// A slice is preallocated.
func NewBytes() *Bytes {
b := Bytes(GetByteBuffer()[:0])
return &b
}
// NewBytesWith returns a new Bytes with the provided content.
// When sent as a parameter, the caller gives up ownership of the byte slice.
// When returned as response, the handler also gives up ownership of the byte slice.
func NewBytesWith(b []byte) *Bytes {
bb := Bytes(b)
return &bb
}
// NewBytesWithCopyOf returns a new byte slice with a copy of the provided content.
func NewBytesWithCopyOf(b []byte) *Bytes {
if b == nil {
bb := Bytes(nil)
return &bb
}
if len(b) < maxBufferSize {
bb := NewBytes()
*bb = append(*bb, b...)
return bb
}
bb := Bytes(make([]byte, len(b)))
copy(bb, b)
return &bb
}
// Bytes provides a byte slice that can be serialized.
type Bytes []byte
@ -168,6 +220,9 @@ func (b *Bytes) UnmarshalMsg(bytes []byte) ([]byte, error) {
return bytes, errors.New("Bytes: UnmarshalMsg on nil pointer")
}
if bytes, err := msgp.ReadNilBytes(bytes); err == nil {
if *b != nil {
PutByteBuffer(*b)
}
*b = nil
return bytes, nil
}
@ -179,7 +234,16 @@ func (b *Bytes) UnmarshalMsg(bytes []byte) ([]byte, error) {
*b = (*b)[:len(val)]
copy(*b, val)
} else {
*b = append(make([]byte, 0, len(val)), val...)
if cap(*b) == 0 && len(val) <= maxBufferSize {
PutByteBuffer(*b)
*b = GetByteBuffer()[:0]
} else {
PutByteBuffer(*b)
*b = make([]byte, 0, len(val))
}
in := *b
in = append(in[:0], val...)
*b = in
}
return bytes, nil
}
@ -202,7 +266,8 @@ func (b *Bytes) Msgsize() int {
// Recycle puts the Bytes back into the pool.
func (b *Bytes) Recycle() {
if *b != nil {
if b != nil && *b != nil {
*b = (*b)[:0]
PutByteBuffer(*b)
*b = nil
}
@ -329,3 +394,29 @@ func (u URLValues) Msgsize() (s int) {
}
return
}
// NoPayload is a type that can be used for handlers that do not use a payload.
type NoPayload struct{}
// Msgsize returns 0.
func (p NoPayload) Msgsize() int {
return 0
}
// UnmarshalMsg satisfies the interface, but is a no-op.
func (NoPayload) UnmarshalMsg(bytes []byte) ([]byte, error) {
return bytes, nil
}
// MarshalMsg satisfies the interface, but is a no-op.
func (NoPayload) MarshalMsg(bytes []byte) ([]byte, error) {
return bytes, nil
}
// NewNoPayload returns an empty NoPayload struct.
func NewNoPayload() NoPayload {
return NoPayload{}
}
// Recycle is a no-op.
func (NoPayload) Recycle() {}