Convert more peer <--> peer REST calls (#19004)

* Convert more peer <--> peer REST calls
* Clean up in general.
* Add JSON wrapper.
* Add slice wrapper.
* Add option to make handler return nil error if no connection is given, `IgnoreNilConn`.

Converts the following:

```
+	HandlerGetMetrics
+	HandlerGetResourceMetrics
+	HandlerGetMemInfo
+	HandlerGetProcInfo
+	HandlerGetOSInfo
+	HandlerGetPartitions
+	HandlerGetNetInfo
+	HandlerGetCPUs
+	HandlerServerInfo
+	HandlerGetSysConfig
+	HandlerGetSysServices
+	HandlerGetSysErrors
+	HandlerGetAllBucketStats
+	HandlerGetBucketStats
+	HandlerGetSRMetrics
+	HandlerGetPeerMetrics
+	HandlerGetMetacacheListing
+	HandlerUpdateMetacacheListing
+	HandlerGetPeerBucketMetrics
+	HandlerStorageInfo
+	HandlerGetLocks
+	HandlerBackgroundHealStatus
+	HandlerGetLastDayTierStats
+	HandlerSignalService
+	HandlerGetBandwidth
```
This commit is contained in:
Klaus Post
2024-02-19 14:54:46 -08:00
committed by GitHub
parent 4c8197a119
commit e06168596f
29 changed files with 4794 additions and 979 deletions

View File

@@ -17,6 +17,8 @@
package bandwidth
//go:generate msgp -file=$GOFILE -unexported
import (
"context"
"sync"
@@ -25,6 +27,8 @@ import (
"golang.org/x/time/rate"
)
//msgp:ignore bucketThrottle Monitor
type bucketThrottle struct {
*rate.Limiter
NodeBandwidthPerSec int64

View File

@@ -0,0 +1,218 @@
package bandwidth
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *BucketBandwidthReport) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z BucketBandwidthReport) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 0
err = en.Append(0x80)
if err != nil {
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z BucketBandwidthReport) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 0
o = append(o, 0x80)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *BucketBandwidthReport) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z BucketBandwidthReport) Msgsize() (s int) {
s = 1
return
}
// DecodeMsg implements msgp.Decodable
func (z *Details) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "LimitInBytesPerSecond":
z.LimitInBytesPerSecond, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "LimitInBytesPerSecond")
return
}
case "CurrentBandwidthInBytesPerSecond":
z.CurrentBandwidthInBytesPerSecond, err = dc.ReadFloat64()
if err != nil {
err = msgp.WrapError(err, "CurrentBandwidthInBytesPerSecond")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z Details) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "LimitInBytesPerSecond"
err = en.Append(0x82, 0xb5, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x49, 0x6e, 0x42, 0x79, 0x74, 0x65, 0x73, 0x50, 0x65, 0x72, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64)
if err != nil {
return
}
err = en.WriteInt64(z.LimitInBytesPerSecond)
if err != nil {
err = msgp.WrapError(err, "LimitInBytesPerSecond")
return
}
// write "CurrentBandwidthInBytesPerSecond"
err = en.Append(0xd9, 0x20, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x61, 0x6e, 0x64, 0x77, 0x69, 0x64, 0x74, 0x68, 0x49, 0x6e, 0x42, 0x79, 0x74, 0x65, 0x73, 0x50, 0x65, 0x72, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64)
if err != nil {
return
}
err = en.WriteFloat64(z.CurrentBandwidthInBytesPerSecond)
if err != nil {
err = msgp.WrapError(err, "CurrentBandwidthInBytesPerSecond")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z Details) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "LimitInBytesPerSecond"
o = append(o, 0x82, 0xb5, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x49, 0x6e, 0x42, 0x79, 0x74, 0x65, 0x73, 0x50, 0x65, 0x72, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64)
o = msgp.AppendInt64(o, z.LimitInBytesPerSecond)
// string "CurrentBandwidthInBytesPerSecond"
o = append(o, 0xd9, 0x20, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x61, 0x6e, 0x64, 0x77, 0x69, 0x64, 0x74, 0x68, 0x49, 0x6e, 0x42, 0x79, 0x74, 0x65, 0x73, 0x50, 0x65, 0x72, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64)
o = msgp.AppendFloat64(o, z.CurrentBandwidthInBytesPerSecond)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Details) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "LimitInBytesPerSecond":
z.LimitInBytesPerSecond, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "LimitInBytesPerSecond")
return
}
case "CurrentBandwidthInBytesPerSecond":
z.CurrentBandwidthInBytesPerSecond, bts, err = msgp.ReadFloat64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "CurrentBandwidthInBytesPerSecond")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z Details) Msgsize() (s int) {
s = 1 + 22 + msgp.Int64Size + 34 + msgp.Float64Size
return
}

View File

@@ -0,0 +1,236 @@
package bandwidth
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalBucketBandwidthReport(t *testing.T) {
v := BucketBandwidthReport{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgBucketBandwidthReport(b *testing.B) {
v := BucketBandwidthReport{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgBucketBandwidthReport(b *testing.B) {
v := BucketBandwidthReport{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalBucketBandwidthReport(b *testing.B) {
v := BucketBandwidthReport{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeBucketBandwidthReport(t *testing.T) {
v := BucketBandwidthReport{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeBucketBandwidthReport Msgsize() is inaccurate")
}
vn := BucketBandwidthReport{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeBucketBandwidthReport(b *testing.B) {
v := BucketBandwidthReport{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeBucketBandwidthReport(b *testing.B) {
v := BucketBandwidthReport{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalDetails(t *testing.T) {
v := Details{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgDetails(b *testing.B) {
v := Details{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgDetails(b *testing.B) {
v := Details{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalDetails(b *testing.B) {
v := Details{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeDetails(t *testing.T) {
v := Details{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeDetails Msgsize() is inaccurate")
}
vn := Details{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeDetails(b *testing.B) {
v := Details{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeDetails(b *testing.B) {
v := Details{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@@ -36,6 +36,7 @@ import (
// HandlerID is a handler identifier.
// It is used to determine request routing on the server.
// Handlers can be registered with a static subroute.
// Do NOT remove or change the order of existing handlers.
const (
// handlerInvalid is reserved to check for uninitialized values.
handlerInvalid HandlerID = iota
@@ -69,7 +70,6 @@ const (
HandlerStopRebalance
HandlerLoadRebalanceMeta
HandlerLoadTransitionTierConfig
HandlerDeletePolicy
HandlerLoadPolicy
HandlerLoadPolicyMapping
@@ -78,11 +78,37 @@ const (
HandlerDeleteUser
HandlerLoadUser
HandlerLoadGroup
HandlerHealBucket
HandlerMakeBucket
HandlerHeadBucket
HandlerDeleteBucket
HandlerGetMetrics
HandlerGetResourceMetrics
HandlerGetMemInfo
HandlerGetProcInfo
HandlerGetOSInfo
HandlerGetPartitions
HandlerGetNetInfo
HandlerGetCPUs
HandlerServerInfo
HandlerGetSysConfig
HandlerGetSysServices
HandlerGetSysErrors
HandlerGetAllBucketStats
HandlerGetBucketStats
HandlerGetSRMetrics
HandlerGetPeerMetrics
HandlerGetMetacacheListing
HandlerUpdateMetacacheListing
HandlerGetPeerBucketMetrics
HandlerStorageInfo
HandlerConsoleLog
HandlerListDir
HandlerGetLocks
HandlerBackgroundHealStatus
HandlerGetLastDayTierStats
HandlerSignalService
HandlerGetBandwidth
// Add more above here ^^^
// If all handlers are used, the type of Handler can be changed.
@@ -137,6 +163,28 @@ var handlerPrefixes = [handlerLast]string{
HandlerHeadBucket: peerPrefixS3,
HandlerDeleteBucket: peerPrefixS3,
HandlerHealBucket: healPrefix,
HandlerGetMetrics: peerPrefix,
HandlerGetResourceMetrics: peerPrefix,
HandlerGetMemInfo: peerPrefix,
HandlerGetProcInfo: peerPrefix,
HandlerGetOSInfo: peerPrefix,
HandlerGetPartitions: peerPrefix,
HandlerGetNetInfo: peerPrefix,
HandlerGetCPUs: peerPrefix,
HandlerServerInfo: peerPrefix,
HandlerGetSysConfig: peerPrefix,
HandlerGetSysServices: peerPrefix,
HandlerGetSysErrors: peerPrefix,
HandlerGetAllBucketStats: peerPrefix,
HandlerGetBucketStats: peerPrefix,
HandlerGetSRMetrics: peerPrefix,
HandlerGetPeerMetrics: peerPrefix,
HandlerGetMetacacheListing: peerPrefix,
HandlerUpdateMetacacheListing: peerPrefix,
HandlerGetPeerBucketMetrics: peerPrefix,
HandlerStorageInfo: peerPrefix,
HandlerConsoleLog: peerPrefix,
HandlerListDir: storagePrefix,
}
const (
@@ -344,9 +392,10 @@ type RoundTripper interface {
// SingleHandler is a type safe handler for single roundtrip requests.
type SingleHandler[Req, Resp RoundTripper] struct {
id HandlerID
sharedResp bool
callReuseReq bool
id HandlerID
sharedResp bool
callReuseReq bool
ignoreNilConn bool
newReq func() Req
newResp func() Resp
@@ -407,6 +456,17 @@ func (h *SingleHandler[Req, Resp]) AllowCallRequestPool(b bool) *SingleHandler[R
return h
}
// IgnoreNilConn will ignore nil connections when calling.
// This will make Call return nil instead of ErrDisconnected when the connection is nil.
// This may only be set ONCE before use.
func (h *SingleHandler[Req, Resp]) IgnoreNilConn() *SingleHandler[Req, Resp] {
if h.ignoreNilConn {
logger.LogOnceIf(context.Background(), fmt.Errorf("%s: IgnoreNilConn called twice", h.id.String()), h.id.String()+"IgnoreNilConn")
}
h.ignoreNilConn = true
return h
}
// WithSharedResponse indicates it is unsafe to reuse the response
// when it has been returned on a handler.
// This will disable automatic response recycling/pooling.
@@ -476,6 +536,12 @@ type Requester interface {
// The response should be returned with PutResponse when no error.
// If no deadline is set, a 1-minute deadline is added.
func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Req) (resp Resp, err error) {
if c == nil {
if h.ignoreNilConn {
return resp, nil
}
return resp, ErrDisconnected
}
payload, err := req.MarshalMsg(GetByteBuffer()[:0])
if err != nil {
return resp, err
@@ -777,6 +843,9 @@ type Streamer interface {
// Call the remove with the request and
func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Streamer, payload Payload) (st *TypedStream[Req, Resp], err error) {
if c == nil {
return nil, ErrDisconnected
}
var payloadB []byte
if h.WithPayload {
var err error

View File

@@ -51,14 +51,41 @@ func _() {
_ = x[HandlerMakeBucket-40]
_ = x[HandlerHeadBucket-41]
_ = x[HandlerDeleteBucket-42]
_ = x[handlerTest-43]
_ = x[handlerTest2-44]
_ = x[handlerLast-45]
_ = x[HandlerGetMetrics-43]
_ = x[HandlerGetResourceMetrics-44]
_ = x[HandlerGetMemInfo-45]
_ = x[HandlerGetProcInfo-46]
_ = x[HandlerGetOSInfo-47]
_ = x[HandlerGetPartitions-48]
_ = x[HandlerGetNetInfo-49]
_ = x[HandlerGetCPUs-50]
_ = x[HandlerServerInfo-51]
_ = x[HandlerGetSysConfig-52]
_ = x[HandlerGetSysServices-53]
_ = x[HandlerGetSysErrors-54]
_ = x[HandlerGetAllBucketStats-55]
_ = x[HandlerGetBucketStats-56]
_ = x[HandlerGetSRMetrics-57]
_ = x[HandlerGetPeerMetrics-58]
_ = x[HandlerGetMetacacheListing-59]
_ = x[HandlerUpdateMetacacheListing-60]
_ = x[HandlerGetPeerBucketMetrics-61]
_ = x[HandlerStorageInfo-62]
_ = x[HandlerConsoleLog-63]
_ = x[HandlerListDir-64]
_ = x[HandlerGetLocks-65]
_ = x[HandlerBackgroundHealStatus-66]
_ = x[HandlerGetLastDayTierStats-67]
_ = x[HandlerSignalService-68]
_ = x[HandlerGetBandwidth-69]
_ = x[handlerTest-70]
_ = x[handlerTest2-71]
_ = x[handlerLast-72]
}
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBuckethandlerTesthandlerTest2handlerLast"
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthhandlerTesthandlerTest2handlerLast"
var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 526, 538, 549}
var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 880, 892, 903}
func (i HandlerID) String() string {
if i >= HandlerID(len(_HandlerID_index)-1) {

View File

@@ -250,7 +250,7 @@ func (m *Manager) RegisterSingleHandler(id HandlerID, h SingleHandlerFn, subrout
if len(subroute) == 0 {
if m.handlers.hasAny(id) && !id.isTestHandler() {
return ErrHandlerAlreadyExists
return fmt.Errorf("handler %v: %w", id.String(), ErrHandlerAlreadyExists)
}
m.handlers.single[id] = h
@@ -258,7 +258,7 @@ func (m *Manager) RegisterSingleHandler(id HandlerID, h SingleHandlerFn, subrout
}
subID := makeSubHandlerID(id, s)
if m.handlers.hasSubhandler(subID) && !id.isTestHandler() {
return ErrHandlerAlreadyExists
return fmt.Errorf("handler %v, subroute:%v: %w", id.String(), s, ErrHandlerAlreadyExists)
}
m.handlers.subSingle[subID] = h
// Copy so clients can also pick it up for other subpaths.

View File

@@ -18,7 +18,10 @@
package grid
import (
"bytes"
"encoding/json"
"errors"
"math"
"net/url"
"sort"
"strings"
@@ -394,6 +397,137 @@ func (u URLValues) Msgsize() (s int) {
return
}
// JSONPool is a pool for JSON objects that unmarshal into T.
type JSONPool[T any] struct {
pool sync.Pool
emptySz int
}
// NewJSONPool returns a new JSONPool.
func NewJSONPool[T any]() *JSONPool[T] {
var t T
sz := 128
if b, err := json.Marshal(t); err != nil {
sz = len(b)
}
return &JSONPool[T]{
pool: sync.Pool{
New: func() interface{} {
var t T
return &t
},
},
emptySz: sz,
}
}
func (p *JSONPool[T]) new() *T {
var zero T
t := p.pool.Get().(*T)
*t = zero
return t
}
// JSON is a wrapper around a T object that can be serialized.
// There is an internal value
type JSON[T any] struct {
p *JSONPool[T]
val *T
}
// NewJSON returns a new JSONPool.
// No initial value is set.
func (p *JSONPool[T]) NewJSON() *JSON[T] {
var j JSON[T]
j.p = p
return &j
}
// NewJSONWith returns a new JSON with the provided value.
func (p *JSONPool[T]) NewJSONWith(val *T) *JSON[T] {
var j JSON[T]
j.p = p
j.val = val
return &j
}
// Value returns the underlying value.
// If not set yet, a new value is created.
func (j *JSON[T]) Value() *T {
if j.val == nil {
j.val = j.p.new()
}
return j.val
}
// ValueOrZero returns the underlying value.
// If the underlying value is nil, a zero value is returned.
func (j *JSON[T]) ValueOrZero() T {
if j == nil || j.val == nil {
var t T
return t
}
return *j.val
}
// Set the underlying value.
func (j *JSON[T]) Set(v *T) {
j.val = v
}
// Recycle the underlying value.
func (j *JSON[T]) Recycle() {
if j.val != nil {
j.p.pool.Put(j.val)
j.val = nil
}
}
// MarshalMsg implements msgp.Marshaler
func (j *JSON[T]) MarshalMsg(b []byte) (o []byte, err error) {
if j.val == nil {
return msgp.AppendNil(b), nil
}
buf := bytes.NewBuffer(GetByteBuffer()[:0])
defer func() {
PutByteBuffer(buf.Bytes())
}()
enc := json.NewEncoder(buf)
err = enc.Encode(j.val)
if err != nil {
return b, err
}
return msgp.AppendBytes(b, buf.Bytes()), nil
}
// UnmarshalMsg will JSON marshal the value and wrap as a msgp byte array.
// Nil values are supported.
func (j *JSON[T]) UnmarshalMsg(bytes []byte) ([]byte, error) {
if bytes, err := msgp.ReadNilBytes(bytes); err == nil {
if j.val != nil {
j.p.pool.Put(j.val)
}
j.val = nil
return bytes, nil
}
val, bytes, err := msgp.ReadBytesZC(bytes)
if err != nil {
return bytes, err
}
if j.val == nil {
j.val = j.p.new()
} else {
var t T
*j.val = t
}
return bytes, json.Unmarshal(val, j.val)
}
// Msgsize returns the size of an empty JSON object.
func (j *JSON[T]) Msgsize() int {
return j.p.emptySz
}
// NoPayload is a type that can be used for handlers that do not use a payload.
type NoPayload struct{}
@@ -419,3 +553,156 @@ func NewNoPayload() NoPayload {
// Recycle is a no-op.
func (NoPayload) Recycle() {}
// ArrayOf wraps an array of Messagepack compatible objects.
type ArrayOf[T RoundTripper] struct {
aPool sync.Pool // Arrays
ePool sync.Pool // Elements
}
// NewArrayOf returns a new ArrayOf.
// You must provide a function that returns a new instance of T.
func NewArrayOf[T RoundTripper](newFn func() T) *ArrayOf[T] {
return &ArrayOf[T]{
ePool: sync.Pool{New: func() any {
return newFn()
}},
}
}
// New returns a new empty Array.
func (p *ArrayOf[T]) New() *Array[T] {
return &Array[T]{
p: p,
}
}
// NewWith returns a new Array with the provided value (not copied).
func (p *ArrayOf[T]) NewWith(val []T) *Array[T] {
return &Array[T]{
p: p,
val: val,
}
}
func (p *ArrayOf[T]) newA(sz uint32) []T {
t, ok := p.aPool.Get().(*[]T)
if !ok || t == nil {
return make([]T, 0, sz)
}
t2 := *t
return t2[:0]
}
func (p *ArrayOf[T]) putA(v []T) {
for _, t := range v {
p.ePool.Put(t)
}
if v != nil {
v = v[:0]
p.aPool.Put(&v)
}
}
func (p *ArrayOf[T]) newE() T {
return p.ePool.Get().(T)
}
// Array provides a wrapper for an underlying array of serializable objects.
type Array[T RoundTripper] struct {
p *ArrayOf[T]
val []T
}
// Msgsize returns the size of the array in bytes.
func (j *Array[T]) Msgsize() int {
if j.val == nil {
return msgp.NilSize
}
sz := msgp.ArrayHeaderSize
for _, v := range j.val {
sz += v.Msgsize()
}
return sz
}
// Value returns the underlying value.
// Regular append mechanics should be observed.
// If no value has been set yet, a new array is created.
func (j *Array[T]) Value() []T {
if j.val == nil {
j.val = j.p.newA(10)
}
return j.val
}
// Append a value to the underlying array.
// The returned Array is always the same as the one called.
func (j *Array[T]) Append(v ...T) *Array[T] {
if j.val == nil {
j.val = j.p.newA(uint32(len(v)))
}
j.val = append(j.val, v...)
return j
}
// Set the underlying value.
func (j *Array[T]) Set(val []T) {
j.val = val
}
// Recycle the underlying value.
func (j *Array[T]) Recycle() {
if j.val != nil {
j.p.putA(j.val)
j.val = nil
}
}
// MarshalMsg implements msgp.Marshaler
func (j *Array[T]) MarshalMsg(b []byte) (o []byte, err error) {
if j.val == nil {
return msgp.AppendNil(b), nil
}
if uint64(len(j.val)) > math.MaxUint32 {
return b, errors.New("array: length of array exceeds math.MaxUint32")
}
b = msgp.AppendArrayHeader(b, uint32(len(j.val)))
for _, v := range j.val {
b, err = v.MarshalMsg(b)
if err != nil {
return b, err
}
}
return b, err
}
// UnmarshalMsg will JSON marshal the value and wrap as a msgp byte array.
// Nil values are supported.
func (j *Array[T]) UnmarshalMsg(bytes []byte) ([]byte, error) {
if bytes, err := msgp.ReadNilBytes(bytes); err == nil {
if j.val != nil {
j.p.putA(j.val)
}
j.val = nil
return bytes, nil
}
l, bytes, err := msgp.ReadArrayHeaderBytes(bytes)
if err != nil {
return bytes, err
}
if j.val == nil {
j.val = j.p.newA(l)
} else {
j.val = j.val[:0]
}
for i := uint32(0); i < l; i++ {
v := j.p.newE()
bytes, err = v.UnmarshalMsg(bytes)
if err != nil {
return bytes, err
}
j.val = append(j.val, v)
}
return bytes, nil
}