mirror of https://github.com/minio/minio.git
809 lines
24 KiB
Go
809 lines
24 KiB
Go
// Copyright (c) 2015-2023 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package grid
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/minio/minio/internal/hash/sha256"
|
|
xioutil "github.com/minio/minio/internal/ioutil"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/tinylib/msgp/msgp"
|
|
)
|
|
|
|
//go:generate stringer -type=HandlerID -output=handlers_string.go -trimprefix=Handler msg.go $GOFILE
|
|
|
|
// HandlerID is a handler identifier.
|
|
// It is used to determine request routing on the server.
|
|
// Handlers can be registered with a static subroute.
|
|
const (
|
|
// handlerInvalid is reserved to check for uninitialized values.
|
|
handlerInvalid HandlerID = iota
|
|
HandlerLockLock
|
|
HandlerLockRLock
|
|
HandlerLockUnlock
|
|
HandlerLockRUnlock
|
|
HandlerLockRefresh
|
|
HandlerLockForceUnlock
|
|
HandlerWalkDir
|
|
HandlerStatVol
|
|
HandlerDiskInfo
|
|
HandlerNSScanner
|
|
HandlerReadXL
|
|
HandlerReadVersion
|
|
HandlerDeleteFile
|
|
HandlerDeleteVersion
|
|
HandlerUpdateMetadata
|
|
HandlerWriteMetadata
|
|
HandlerCheckParts
|
|
HandlerRenameData
|
|
HandlerRenameFile
|
|
HandlerReadAll
|
|
HandlerServerVerify
|
|
HandlerTrace
|
|
HandlerListen
|
|
HandlerGetLocalDiskIDs
|
|
HandlerDeleteBucketMetadata
|
|
HandlerLoadBucketMetadata
|
|
HandlerReloadSiteReplicationConfig
|
|
HandlerReloadPoolMeta
|
|
HandlerStopRebalance
|
|
HandlerLoadRebalanceMeta
|
|
HandlerLoadTransitionTierConfig
|
|
|
|
HandlerDeletePolicy
|
|
HandlerLoadPolicy
|
|
HandlerLoadPolicyMapping
|
|
HandlerDeleteServiceAccount
|
|
HandlerLoadServiceAccount
|
|
HandlerDeleteUser
|
|
HandlerLoadUser
|
|
HandlerLoadGroup
|
|
|
|
// Add more above here ^^^
|
|
// If all handlers are used, the type of Handler can be changed.
|
|
// Handlers have no versioning, so non-compatible handler changes must result in new IDs.
|
|
handlerTest
|
|
handlerTest2
|
|
handlerLast
|
|
)
|
|
|
|
// handlerPrefixes are prefixes for handler IDs used for tracing.
|
|
// If a handler is not listed here, it will be traced with "grid" prefix.
|
|
var handlerPrefixes = [handlerLast]string{
|
|
HandlerLockLock: lockPrefix,
|
|
HandlerLockRLock: lockPrefix,
|
|
HandlerLockUnlock: lockPrefix,
|
|
HandlerLockRUnlock: lockPrefix,
|
|
HandlerLockRefresh: lockPrefix,
|
|
HandlerLockForceUnlock: lockPrefix,
|
|
HandlerWalkDir: storagePrefix,
|
|
HandlerStatVol: storagePrefix,
|
|
HandlerDiskInfo: storagePrefix,
|
|
HandlerNSScanner: storagePrefix,
|
|
HandlerReadXL: storagePrefix,
|
|
HandlerReadVersion: storagePrefix,
|
|
HandlerDeleteFile: storagePrefix,
|
|
HandlerDeleteVersion: storagePrefix,
|
|
HandlerUpdateMetadata: storagePrefix,
|
|
HandlerWriteMetadata: storagePrefix,
|
|
HandlerCheckParts: storagePrefix,
|
|
HandlerRenameData: storagePrefix,
|
|
HandlerRenameFile: storagePrefix,
|
|
HandlerReadAll: storagePrefix,
|
|
HandlerServerVerify: bootstrapPrefix,
|
|
HandlerTrace: peerPrefix,
|
|
HandlerListen: peerPrefix,
|
|
HandlerGetLocalDiskIDs: peerPrefix,
|
|
HandlerDeleteBucketMetadata: peerPrefix,
|
|
HandlerLoadBucketMetadata: peerPrefix,
|
|
HandlerReloadSiteReplicationConfig: peerPrefix,
|
|
HandlerReloadPoolMeta: peerPrefix,
|
|
HandlerStopRebalance: peerPrefix,
|
|
HandlerLoadRebalanceMeta: peerPrefix,
|
|
HandlerLoadTransitionTierConfig: peerPrefix,
|
|
HandlerDeletePolicy: peerPrefix,
|
|
HandlerLoadPolicy: peerPrefix,
|
|
HandlerLoadPolicyMapping: peerPrefix,
|
|
HandlerDeleteServiceAccount: peerPrefix,
|
|
HandlerLoadServiceAccount: peerPrefix,
|
|
HandlerDeleteUser: peerPrefix,
|
|
HandlerLoadUser: peerPrefix,
|
|
HandlerLoadGroup: peerPrefix,
|
|
}
|
|
|
|
const (
|
|
lockPrefix = "lockR"
|
|
storagePrefix = "storageR"
|
|
bootstrapPrefix = "bootstrap"
|
|
peerPrefix = "peer"
|
|
)
|
|
|
|
func init() {
|
|
// Static check if we exceed 255 handler ids.
|
|
// Extend the type to uint16 when hit.
|
|
if handlerLast > 255 {
|
|
panic(fmt.Sprintf("out of handler IDs. %d > %d", handlerLast, 255))
|
|
}
|
|
}
|
|
|
|
func (h HandlerID) valid() bool {
|
|
return h != handlerInvalid && h < handlerLast
|
|
}
|
|
|
|
func (h HandlerID) isTestHandler() bool {
|
|
return h >= handlerTest && h <= handlerTest2
|
|
}
|
|
|
|
// RemoteErr is a remote error type.
|
|
// Any error seen on a remote will be returned like this.
|
|
type RemoteErr string
|
|
|
|
// NewRemoteErr creates a new remote error.
|
|
// The error type is not preserved.
|
|
func NewRemoteErr(err error) *RemoteErr {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
r := RemoteErr(err.Error())
|
|
return &r
|
|
}
|
|
|
|
// NewRemoteErrf creates a new remote error from a format string.
|
|
func NewRemoteErrf(format string, a ...any) *RemoteErr {
|
|
r := RemoteErr(fmt.Sprintf(format, a...))
|
|
return &r
|
|
}
|
|
|
|
// NewNPErr is a helper to no payload and optional remote error.
|
|
// The error type is not preserved.
|
|
func NewNPErr(err error) (NoPayload, *RemoteErr) {
|
|
if err == nil {
|
|
return NoPayload{}, nil
|
|
}
|
|
r := RemoteErr(err.Error())
|
|
return NoPayload{}, &r
|
|
}
|
|
|
|
// NewRemoteErrString creates a new remote error from a string.
|
|
func NewRemoteErrString(msg string) *RemoteErr {
|
|
r := RemoteErr(msg)
|
|
return &r
|
|
}
|
|
|
|
func (r RemoteErr) Error() string {
|
|
return string(r)
|
|
}
|
|
|
|
// Is returns if the string representation matches.
|
|
func (r *RemoteErr) Is(other error) bool {
|
|
if r == nil || other == nil {
|
|
return r == other
|
|
}
|
|
var o RemoteErr
|
|
if errors.As(other, &o) {
|
|
return r == &o
|
|
}
|
|
return false
|
|
}
|
|
|
|
// IsRemoteErr returns the value if the error is a RemoteErr.
|
|
func IsRemoteErr(err error) *RemoteErr {
|
|
var r RemoteErr
|
|
if errors.As(err, &r) {
|
|
return &r
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type (
|
|
// SingleHandlerFn is handlers for one to one requests.
|
|
// A non-nil error value will be returned as RemoteErr(msg) to client.
|
|
// No client information or cancellation (deadline) is available.
|
|
// Include this in payload if needed.
|
|
// Payload should be recycled with PutByteBuffer if not needed after the call.
|
|
SingleHandlerFn func(payload []byte) ([]byte, *RemoteErr)
|
|
|
|
// StatelessHandlerFn must handle incoming stateless request.
|
|
// A non-nil error value will be returned as RemoteErr(msg) to client.
|
|
StatelessHandlerFn func(ctx context.Context, payload []byte, resp chan<- []byte) *RemoteErr
|
|
|
|
// StatelessHandler is handlers for one to many requests,
|
|
// where responses may be dropped.
|
|
// Stateless requests provide no incoming stream and there is no flow control
|
|
// on outgoing messages.
|
|
StatelessHandler struct {
|
|
Handle StatelessHandlerFn
|
|
// OutCapacity is the output capacity on the caller.
|
|
// If <= 0 capacity will be 1.
|
|
OutCapacity int
|
|
}
|
|
|
|
// StreamHandlerFn must process a request with an optional initial payload.
|
|
// It must keep consuming from 'in' until it returns.
|
|
// 'in' and 'out' are independent.
|
|
// The handler should never close out.
|
|
// Buffers received from 'in' can be recycled with PutByteBuffer.
|
|
// Buffers sent on out can not be referenced once sent.
|
|
StreamHandlerFn func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *RemoteErr
|
|
|
|
// StreamHandler handles fully bidirectional streams,
|
|
// There is flow control in both directions.
|
|
StreamHandler struct {
|
|
// Handle an incoming request. Initial payload is sent.
|
|
// Additional input packets (if any) are streamed to request.
|
|
// Upstream will block when request channel is full.
|
|
// Response packets can be sent at any time.
|
|
// Any non-nil error sent as response means no more responses are sent.
|
|
Handle StreamHandlerFn
|
|
|
|
// Subroute for handler.
|
|
// Subroute must be static and clients should specify a matching subroute.
|
|
// Should not be set unless there are different handlers for the same HandlerID.
|
|
Subroute string
|
|
|
|
// OutCapacity is the output capacity. If <= 0 capacity will be 1.
|
|
OutCapacity int
|
|
|
|
// InCapacity is the output capacity.
|
|
// If == 0 no input is expected
|
|
InCapacity int
|
|
}
|
|
)
|
|
|
|
type subHandlerID [32]byte
|
|
|
|
func makeSubHandlerID(id HandlerID, subRoute string) subHandlerID {
|
|
b := subHandlerID(sha256.Sum256([]byte(subRoute)))
|
|
b[0] = byte(id)
|
|
b[1] = 0 // Reserved
|
|
return b
|
|
}
|
|
|
|
func (s subHandlerID) withHandler(id HandlerID) subHandlerID {
|
|
s[0] = byte(id)
|
|
s[1] = 0 // Reserved
|
|
return s
|
|
}
|
|
|
|
func (s *subHandlerID) String() string {
|
|
if s == nil {
|
|
return ""
|
|
}
|
|
return hex.EncodeToString(s[:])
|
|
}
|
|
|
|
func makeZeroSubHandlerID(id HandlerID) subHandlerID {
|
|
return subHandlerID{byte(id)}
|
|
}
|
|
|
|
type handlers struct {
|
|
single [handlerLast]SingleHandlerFn
|
|
stateless [handlerLast]*StatelessHandler
|
|
streams [handlerLast]*StreamHandler
|
|
|
|
subSingle map[subHandlerID]SingleHandlerFn
|
|
subStateless map[subHandlerID]*StatelessHandler
|
|
subStreams map[subHandlerID]*StreamHandler
|
|
}
|
|
|
|
func (h *handlers) init() {
|
|
h.subSingle = make(map[subHandlerID]SingleHandlerFn)
|
|
h.subStateless = make(map[subHandlerID]*StatelessHandler)
|
|
h.subStreams = make(map[subHandlerID]*StreamHandler)
|
|
}
|
|
|
|
func (h *handlers) hasAny(id HandlerID) bool {
|
|
if !id.valid() {
|
|
return false
|
|
}
|
|
return h.single[id] != nil || h.stateless[id] != nil || h.streams[id] != nil
|
|
}
|
|
|
|
func (h *handlers) hasSubhandler(id subHandlerID) bool {
|
|
return h.subSingle[id] != nil || h.subStateless[id] != nil || h.subStreams[id] != nil
|
|
}
|
|
|
|
// RoundTripper provides an interface for type roundtrip serialization.
|
|
type RoundTripper interface {
|
|
msgp.Unmarshaler
|
|
msgp.Marshaler
|
|
msgp.Sizer
|
|
|
|
comparable
|
|
}
|
|
|
|
// SingleHandler is a type safe handler for single roundtrip requests.
|
|
type SingleHandler[Req, Resp RoundTripper] struct {
|
|
id HandlerID
|
|
sharedResp bool
|
|
callReuseReq bool
|
|
|
|
newReq func() Req
|
|
newResp func() Resp
|
|
|
|
recycleReq func(Req)
|
|
recycleResp func(Resp)
|
|
}
|
|
|
|
func recycleFunc[RT RoundTripper](newRT func() RT) (newFn func() RT, recycle func(r RT)) {
|
|
rAny := any(newRT())
|
|
var rZero RT
|
|
if _, ok := rAny.(Recycler); ok {
|
|
return newRT, func(r RT) {
|
|
if r != rZero {
|
|
if rc, ok := any(r).(Recycler); ok {
|
|
rc.Recycle()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
pool := sync.Pool{
|
|
New: func() interface{} {
|
|
return newRT()
|
|
},
|
|
}
|
|
return func() RT { return pool.Get().(RT) },
|
|
func(r RT) {
|
|
if r != rZero {
|
|
pool.Put(r)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NewSingleHandler creates a typed handler that can provide Marshal/Unmarshal.
|
|
// Use Register to register a server handler.
|
|
// Use Call to initiate a clientside call.
|
|
func NewSingleHandler[Req, Resp RoundTripper](h HandlerID, newReq func() Req, newResp func() Resp) *SingleHandler[Req, Resp] {
|
|
s := SingleHandler[Req, Resp]{id: h}
|
|
s.newReq, s.recycleReq = recycleFunc[Req](newReq)
|
|
s.newResp, s.recycleResp = recycleFunc[Resp](newResp)
|
|
if _, ok := any(newReq()).(Recycler); ok {
|
|
s.callReuseReq = true
|
|
}
|
|
return &s
|
|
}
|
|
|
|
// PutResponse will accept a response for reuse.
|
|
// This can be used by a caller to recycle a response after receiving it from a Call.
|
|
func (h *SingleHandler[Req, Resp]) PutResponse(r Resp) {
|
|
h.recycleResp(r)
|
|
}
|
|
|
|
// AllowCallRequestPool indicates it is safe to reuse the request
|
|
// on the client side, meaning the request is recycled/pooled when a request is sent.
|
|
// CAREFUL: This should only be used when there are no pointers, slices that aren't freshly constructed.
|
|
func (h *SingleHandler[Req, Resp]) AllowCallRequestPool(b bool) *SingleHandler[Req, Resp] {
|
|
h.callReuseReq = b
|
|
return h
|
|
}
|
|
|
|
// WithSharedResponse indicates it is unsafe to reuse the response
|
|
// when it has been returned on a handler.
|
|
// This will disable automatic response recycling/pooling.
|
|
// Typically this is used when the response sharing part of its data structure.
|
|
func (h *SingleHandler[Req, Resp]) WithSharedResponse() *SingleHandler[Req, Resp] {
|
|
h.sharedResp = true
|
|
return h
|
|
}
|
|
|
|
// NewResponse creates a new response.
|
|
// Handlers can use this to create a reusable response.
|
|
// The response may be reused, so caller should clear any fields.
|
|
func (h *SingleHandler[Req, Resp]) NewResponse() Resp {
|
|
return h.newResp()
|
|
}
|
|
|
|
// NewRequest creates a new request.
|
|
// Handlers can use this to create a reusable request.
|
|
// The request may be reused, so caller should clear any fields.
|
|
func (h *SingleHandler[Req, Resp]) NewRequest() Req {
|
|
return h.newReq()
|
|
}
|
|
|
|
// Register a handler for a Req -> Resp roundtrip.
|
|
// Requests are automatically recycled.
|
|
func (h *SingleHandler[Req, Resp]) Register(m *Manager, handle func(req Req) (resp Resp, err *RemoteErr), subroute ...string) error {
|
|
if h.newReq == nil {
|
|
return errors.New("newReq nil in NewSingleHandler")
|
|
}
|
|
if h.newResp == nil {
|
|
return errors.New("newResp nil in NewSingleHandler")
|
|
}
|
|
return m.RegisterSingleHandler(h.id, func(payload []byte) ([]byte, *RemoteErr) {
|
|
req := h.NewRequest()
|
|
_, err := req.UnmarshalMsg(payload)
|
|
if err != nil {
|
|
PutByteBuffer(payload)
|
|
r := RemoteErr(err.Error())
|
|
return nil, &r
|
|
}
|
|
resp, rerr := handle(req)
|
|
h.recycleReq(req)
|
|
|
|
if rerr != nil {
|
|
PutByteBuffer(payload)
|
|
return nil, rerr
|
|
}
|
|
payload, err = resp.MarshalMsg(payload[:0])
|
|
if !h.sharedResp {
|
|
h.PutResponse(resp)
|
|
}
|
|
if err != nil {
|
|
PutByteBuffer(payload)
|
|
r := RemoteErr(err.Error())
|
|
return nil, &r
|
|
}
|
|
return payload, nil
|
|
}, subroute...)
|
|
}
|
|
|
|
// Requester is able to send requests to a remote.
|
|
type Requester interface {
|
|
Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)
|
|
}
|
|
|
|
// Call the remote with the request and return the response.
|
|
// The response should be returned with PutResponse when no error.
|
|
// If no deadline is set, a 1-minute deadline is added.
|
|
func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Req) (resp Resp, err error) {
|
|
payload, err := req.MarshalMsg(GetByteBuffer()[:0])
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
switch any(req).(type) {
|
|
case *MSS, *Bytes, *URLValues:
|
|
ctx = context.WithValue(ctx, TraceParamsKey{}, req)
|
|
case *NoPayload:
|
|
default:
|
|
ctx = context.WithValue(ctx, TraceParamsKey{}, fmt.Sprintf("type=%T", req))
|
|
}
|
|
if h.callReuseReq {
|
|
defer h.recycleReq(req)
|
|
}
|
|
res, err := c.Request(ctx, h.id, payload)
|
|
PutByteBuffer(payload)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
defer PutByteBuffer(res)
|
|
r := h.NewResponse()
|
|
_, err = r.UnmarshalMsg(res)
|
|
if err != nil {
|
|
h.PutResponse(r)
|
|
return resp, err
|
|
}
|
|
return r, err
|
|
}
|
|
|
|
// RemoteClient contains information about the caller.
|
|
type RemoteClient struct {
|
|
Name string
|
|
}
|
|
|
|
type (
|
|
ctxCallerKey = struct{}
|
|
ctxSubrouteKey = struct{}
|
|
)
|
|
|
|
// GetCaller returns caller information from contexts provided to handlers.
|
|
func GetCaller(ctx context.Context) *RemoteClient {
|
|
val, _ := ctx.Value(ctxCallerKey{}).(*RemoteClient)
|
|
return val
|
|
}
|
|
|
|
// GetSubroute returns caller information from contexts provided to handlers.
|
|
func GetSubroute(ctx context.Context) string {
|
|
val, _ := ctx.Value(ctxSubrouteKey{}).(string)
|
|
return val
|
|
}
|
|
|
|
func setCaller(ctx context.Context, cl *RemoteClient) context.Context {
|
|
return context.WithValue(ctx, ctxCallerKey{}, cl)
|
|
}
|
|
|
|
func setSubroute(ctx context.Context, s string) context.Context {
|
|
return context.WithValue(ctx, ctxSubrouteKey{}, s)
|
|
}
|
|
|
|
// StreamTypeHandler is a type safe handler for streaming requests.
|
|
type StreamTypeHandler[Payload, Req, Resp RoundTripper] struct {
|
|
WithPayload bool
|
|
|
|
// Override the default capacities (1)
|
|
OutCapacity int
|
|
|
|
// Set to 0 if no input is expected.
|
|
// Will be 0 if newReq is nil.
|
|
InCapacity int
|
|
|
|
reqPool sync.Pool
|
|
respPool sync.Pool
|
|
id HandlerID
|
|
newPayload func() Payload
|
|
nilReq Req
|
|
nilResp Resp
|
|
sharedResponse bool
|
|
}
|
|
|
|
// NewStream creates a typed handler that can provide Marshal/Unmarshal.
|
|
// Use Register to register a server handler.
|
|
// Use Call to initiate a clientside call.
|
|
// newPayload can be nil. In that case payloads will always be nil.
|
|
// newReq can be nil. In that case no input stream is expected and the handler will be called with nil 'in' channel.
|
|
func NewStream[Payload, Req, Resp RoundTripper](h HandlerID, newPayload func() Payload, newReq func() Req, newResp func() Resp) *StreamTypeHandler[Payload, Req, Resp] {
|
|
if newResp == nil {
|
|
panic("newResp missing in NewStream")
|
|
}
|
|
|
|
s := newStreamHandler[Payload, Req, Resp](h)
|
|
if newReq != nil {
|
|
s.reqPool.New = func() interface{} {
|
|
return newReq()
|
|
}
|
|
} else {
|
|
s.InCapacity = 0
|
|
}
|
|
s.respPool.New = func() interface{} {
|
|
return newResp()
|
|
}
|
|
s.newPayload = newPayload
|
|
s.WithPayload = newPayload != nil
|
|
return s
|
|
}
|
|
|
|
// WithSharedResponse indicates it is unsafe to reuse the response.
|
|
// Typically this is used when the response sharing part of its data structure.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) WithSharedResponse() *StreamTypeHandler[Payload, Req, Resp] {
|
|
h.sharedResponse = true
|
|
return h
|
|
}
|
|
|
|
// NewPayload creates a new payload.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) NewPayload() Payload {
|
|
return h.newPayload()
|
|
}
|
|
|
|
// NewRequest creates a new request.
|
|
// The struct may be reused, so caller should clear any fields.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) NewRequest() Req {
|
|
return h.reqPool.Get().(Req)
|
|
}
|
|
|
|
// PutRequest will accept a request for reuse.
|
|
// These should be returned by the handler.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) PutRequest(r Req) {
|
|
if r != h.nilReq {
|
|
h.reqPool.Put(r)
|
|
}
|
|
}
|
|
|
|
// PutResponse will accept a response for reuse.
|
|
// These should be returned by the caller.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) PutResponse(r Resp) {
|
|
if r != h.nilResp {
|
|
h.respPool.Put(r)
|
|
}
|
|
}
|
|
|
|
// NewResponse creates a new response.
|
|
// Handlers can use this to create a reusable response.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) NewResponse() Resp {
|
|
return h.respPool.Get().(Resp)
|
|
}
|
|
|
|
func newStreamHandler[Payload, Req, Resp RoundTripper](h HandlerID) *StreamTypeHandler[Payload, Req, Resp] {
|
|
return &StreamTypeHandler[Payload, Req, Resp]{id: h, InCapacity: 1, OutCapacity: 1}
|
|
}
|
|
|
|
// Register a handler for two-way streaming with payload, input stream and output stream.
|
|
// An optional subroute can be given. Multiple entries are joined with '/'.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) Register(m *Manager, handle func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error {
|
|
return h.register(m, handle, subroute...)
|
|
}
|
|
|
|
// WithOutCapacity adjusts the output capacity from the handler perspective.
|
|
// This must be done prior to registering the handler.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) WithOutCapacity(out int) *StreamTypeHandler[Payload, Req, Resp] {
|
|
h.OutCapacity = out
|
|
return h
|
|
}
|
|
|
|
// WithInCapacity adjusts the input capacity from the handler perspective.
|
|
// This must be done prior to registering the handler.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) WithInCapacity(in int) *StreamTypeHandler[Payload, Req, Resp] {
|
|
h.InCapacity = in
|
|
return h
|
|
}
|
|
|
|
// RegisterNoInput a handler for one-way streaming with payload and output stream.
|
|
// An optional subroute can be given. Multiple entries are joined with '/'.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput(m *Manager, handle func(ctx context.Context, p Payload, out chan<- Resp) *RemoteErr, subroute ...string) error {
|
|
h.InCapacity = 0
|
|
return h.register(m, func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr {
|
|
return handle(ctx, p, out)
|
|
}, subroute...)
|
|
}
|
|
|
|
// RegisterNoPayload a handler for one-way streaming with payload and output stream.
|
|
// An optional subroute can be given. Multiple entries are joined with '/'.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoPayload(m *Manager, handle func(ctx context.Context, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error {
|
|
h.WithPayload = false
|
|
return h.register(m, func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr {
|
|
return handle(ctx, in, out)
|
|
}, subroute...)
|
|
}
|
|
|
|
// Register a handler for two-way streaming with optional payload and input stream.
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error {
|
|
return m.RegisterStreamingHandler(h.id, StreamHandler{
|
|
Handle: func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *RemoteErr {
|
|
var plT Payload
|
|
if h.WithPayload {
|
|
plT = h.NewPayload()
|
|
_, err := plT.UnmarshalMsg(payload)
|
|
PutByteBuffer(payload)
|
|
if err != nil {
|
|
r := RemoteErr(err.Error())
|
|
return &r
|
|
}
|
|
}
|
|
|
|
var inT chan Req
|
|
if h.InCapacity > 0 {
|
|
// Don't add extra buffering
|
|
inT = make(chan Req)
|
|
go func() {
|
|
defer xioutil.SafeClose(inT)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case v, ok := <-in:
|
|
if !ok {
|
|
return
|
|
}
|
|
input := h.NewRequest()
|
|
_, err := input.UnmarshalMsg(v)
|
|
if err != nil {
|
|
logger.LogOnceIf(ctx, err, err.Error())
|
|
}
|
|
PutByteBuffer(v)
|
|
// Send input
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case inT <- input:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
outT := make(chan Resp)
|
|
outDone := make(chan struct{})
|
|
go func() {
|
|
defer xioutil.SafeClose(outDone)
|
|
dropOutput := false
|
|
for v := range outT {
|
|
if dropOutput {
|
|
continue
|
|
}
|
|
dst := GetByteBuffer()
|
|
dst, err := v.MarshalMsg(dst[:0])
|
|
if err != nil {
|
|
logger.LogOnceIf(ctx, err, err.Error())
|
|
}
|
|
if !h.sharedResponse {
|
|
h.PutResponse(v)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
dropOutput = true
|
|
case out <- dst:
|
|
}
|
|
}
|
|
}()
|
|
rErr := handle(ctx, plT, inT, outT)
|
|
xioutil.SafeClose(outT)
|
|
<-outDone
|
|
return rErr
|
|
}, OutCapacity: h.OutCapacity, InCapacity: h.InCapacity, Subroute: strings.Join(subroute, "/"),
|
|
})
|
|
}
|
|
|
|
// TypedStream is a stream with specific types.
|
|
type TypedStream[Req, Resp RoundTripper] struct {
|
|
// responses from the remote server.
|
|
// Channel will be closed after error or when remote closes.
|
|
// responses *must* be read to either an error is returned or the channel is closed.
|
|
responses *Stream
|
|
newResp func() Resp
|
|
|
|
// Requests sent to the server.
|
|
// If the handler is defined with 0 incoming capacity this will be nil.
|
|
// Channel *must* be closed to signal the end of the stream.
|
|
// If the request context is canceled, the stream will no longer process requests.
|
|
Requests chan<- Req
|
|
}
|
|
|
|
// Results returns the results from the remote server one by one.
|
|
// If any error is returned by the callback, the stream will be canceled.
|
|
// If the context is canceled, the stream will be canceled.
|
|
func (s *TypedStream[Req, Resp]) Results(next func(resp Resp) error) (err error) {
|
|
return s.responses.Results(func(b []byte) error {
|
|
resp := s.newResp()
|
|
_, err := resp.UnmarshalMsg(b)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return next(resp)
|
|
})
|
|
}
|
|
|
|
// Streamer creates a stream.
|
|
type Streamer interface {
|
|
NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)
|
|
}
|
|
|
|
// Call the remove with the request and
|
|
func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Streamer, payload Payload) (st *TypedStream[Req, Resp], err error) {
|
|
var payloadB []byte
|
|
if h.WithPayload {
|
|
var err error
|
|
payloadB, err = payload.MarshalMsg(GetByteBuffer()[:0])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
stream, err := c.NewStream(ctx, h.id, payloadB)
|
|
PutByteBuffer(payloadB)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// respT := make(chan TypedResponse[Resp])
|
|
var reqT chan Req
|
|
if h.InCapacity > 0 {
|
|
reqT = make(chan Req)
|
|
// Request handler
|
|
if stream.Requests == nil {
|
|
return nil, fmt.Errorf("internal error: stream request channel nil")
|
|
}
|
|
go func() {
|
|
defer xioutil.SafeClose(stream.Requests)
|
|
for req := range reqT {
|
|
b, err := req.MarshalMsg(GetByteBuffer()[:0])
|
|
if err != nil {
|
|
logger.LogOnceIf(ctx, err, err.Error())
|
|
}
|
|
h.PutRequest(req)
|
|
stream.Requests <- b
|
|
}
|
|
}()
|
|
} else if stream.Requests != nil {
|
|
xioutil.SafeClose(stream.Requests)
|
|
}
|
|
|
|
return &TypedStream[Req, Resp]{responses: stream, newResp: h.NewResponse, Requests: reqT}, nil
|
|
}
|