// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package grid

import (
	"context"
	"encoding/hex"
	"errors"
	"fmt"
	"strings"
	"sync"

	"github.com/minio/minio/internal/hash/sha256"
	xioutil "github.com/minio/minio/internal/ioutil"
	"github.com/minio/minio/internal/logger"
	"github.com/tinylib/msgp/msgp"
)

//go:generate stringer -type=HandlerID -output=handlers_string.go -trimprefix=Handler msg.go $GOFILE

// HandlerID is a handler identifier.
// It is used to determine request routing on the server.
// Handlers can be registered with a static subroute.
// Do NOT remove or change the order of existing handlers.
const (
	// handlerInvalid is reserved to check for uninitialized values.
	handlerInvalid HandlerID = iota
	HandlerLockLock
	HandlerLockRLock
	HandlerLockUnlock
	HandlerLockRUnlock
	HandlerLockRefresh
	HandlerLockForceUnlock
	HandlerWalkDir
	HandlerStatVol
	HandlerDiskInfo
	HandlerNSScanner
	HandlerReadXL
	HandlerReadVersion
	HandlerDeleteFile
	HandlerDeleteVersion
	HandlerUpdateMetadata
	HandlerWriteMetadata
	HandlerCheckParts
	HandlerRenameData
	HandlerRenameFile
	HandlerReadAll
	HandlerServerVerify
	HandlerTrace
	HandlerListen
	HandlerDeleteBucketMetadata
	HandlerLoadBucketMetadata
	HandlerReloadSiteReplicationConfig
	HandlerReloadPoolMeta
	HandlerStopRebalance
	HandlerLoadRebalanceMeta
	HandlerLoadTransitionTierConfig
	HandlerDeletePolicy
	HandlerLoadPolicy
	HandlerLoadPolicyMapping
	HandlerDeleteServiceAccount
	HandlerLoadServiceAccount
	HandlerDeleteUser
	HandlerLoadUser
	HandlerLoadGroup
	HandlerHealBucket
	HandlerMakeBucket
	HandlerHeadBucket
	HandlerDeleteBucket
	HandlerGetMetrics
	HandlerGetResourceMetrics
	HandlerGetMemInfo
	HandlerGetProcInfo
	HandlerGetOSInfo
	HandlerGetPartitions
	HandlerGetNetInfo
	HandlerGetCPUs
	HandlerServerInfo
	HandlerGetSysConfig
	HandlerGetSysServices
	HandlerGetSysErrors
	HandlerGetAllBucketStats
	HandlerGetBucketStats
	HandlerGetSRMetrics
	HandlerGetPeerMetrics
	HandlerGetMetacacheListing
	HandlerUpdateMetacacheListing
	HandlerGetPeerBucketMetrics
	HandlerStorageInfo
	HandlerConsoleLog
	HandlerListDir
	HandlerGetLocks
	HandlerBackgroundHealStatus
	HandlerGetLastDayTierStats
	HandlerSignalService
	HandlerGetBandwidth
	HandlerWriteAll
	HandlerListBuckets

	// Add more above here ^^^
	// If all handlers are used, the type of Handler can be changed.
	// Handlers have no versioning, so non-compatible handler changes must result in new IDs.
	handlerTest
	handlerTest2
	handlerLast
)

// handlerPrefixes are prefixes for handler IDs used for tracing.
// If a handler is not listed here, it will be traced with "grid" prefix.
var handlerPrefixes = [handlerLast]string{
	HandlerLockLock:                    lockPrefix,
	HandlerLockRLock:                   lockPrefix,
	HandlerLockUnlock:                  lockPrefix,
	HandlerLockRUnlock:                 lockPrefix,
	HandlerLockRefresh:                 lockPrefix,
	HandlerLockForceUnlock:             lockPrefix,
	HandlerWalkDir:                     storagePrefix,
	HandlerStatVol:                     storagePrefix,
	HandlerDiskInfo:                    storagePrefix,
	HandlerNSScanner:                   storagePrefix,
	HandlerReadXL:                      storagePrefix,
	HandlerReadVersion:                 storagePrefix,
	HandlerDeleteFile:                  storagePrefix,
	HandlerDeleteVersion:               storagePrefix,
	HandlerUpdateMetadata:              storagePrefix,
	HandlerWriteMetadata:               storagePrefix,
	HandlerCheckParts:                  storagePrefix,
	HandlerRenameData:                  storagePrefix,
	HandlerRenameFile:                  storagePrefix,
	HandlerReadAll:                     storagePrefix,
	HandlerWriteAll:                    storagePrefix,
	HandlerServerVerify:                bootstrapPrefix,
	HandlerTrace:                       peerPrefix,
	HandlerListen:                      peerPrefix,
	HandlerDeleteBucketMetadata:        peerPrefix,
	HandlerLoadBucketMetadata:          peerPrefix,
	HandlerReloadSiteReplicationConfig: peerPrefix,
	HandlerReloadPoolMeta:              peerPrefix,
	HandlerStopRebalance:               peerPrefix,
	HandlerLoadRebalanceMeta:           peerPrefix,
	HandlerLoadTransitionTierConfig:    peerPrefix,
	HandlerDeletePolicy:                peerPrefix,
	HandlerLoadPolicy:                  peerPrefix,
	HandlerLoadPolicyMapping:           peerPrefix,
	HandlerDeleteServiceAccount:        peerPrefix,
	HandlerLoadServiceAccount:          peerPrefix,
	HandlerDeleteUser:                  peerPrefix,
	HandlerLoadUser:                    peerPrefix,
	HandlerLoadGroup:                   peerPrefix,
	HandlerMakeBucket:                  peerPrefixS3,
	HandlerHeadBucket:                  peerPrefixS3,
	HandlerDeleteBucket:                peerPrefixS3,
	HandlerHealBucket:                  healPrefix,
	HandlerGetMetrics:                  peerPrefix,
	HandlerGetResourceMetrics:          peerPrefix,
	HandlerGetMemInfo:                  peerPrefix,
	HandlerGetProcInfo:                 peerPrefix,
	HandlerGetOSInfo:                   peerPrefix,
	HandlerGetPartitions:               peerPrefix,
	HandlerGetNetInfo:                  peerPrefix,
	HandlerGetCPUs:                     peerPrefix,
	HandlerServerInfo:                  peerPrefix,
	HandlerGetSysConfig:                peerPrefix,
	HandlerGetSysServices:              peerPrefix,
	HandlerGetSysErrors:                peerPrefix,
	HandlerGetAllBucketStats:           peerPrefix,
	HandlerGetBucketStats:              peerPrefix,
	HandlerGetSRMetrics:                peerPrefix,
	HandlerGetPeerMetrics:              peerPrefix,
	HandlerGetMetacacheListing:         peerPrefix,
	HandlerUpdateMetacacheListing:      peerPrefix,
	HandlerGetPeerBucketMetrics:        peerPrefix,
	HandlerStorageInfo:                 peerPrefix,
	HandlerConsoleLog:                  peerPrefix,
	HandlerListDir:                     storagePrefix,
	HandlerListBuckets:                 peerPrefixS3,
}

const (
	lockPrefix      = "lockR"
	storagePrefix   = "storageR"
	bootstrapPrefix = "bootstrap"
	peerPrefix      = "peer"
	peerPrefixS3    = "peerS3"
	healPrefix      = "heal"
)

func init() {
	// Static check if we exceed 255 handler ids.
	// Extend the type to uint16 when hit.
	if handlerLast > 255 {
		panic(fmt.Sprintf("out of handler IDs. %d > %d", handlerLast, 255))
	}
}

func (h HandlerID) valid() bool {
	return h != handlerInvalid && h < handlerLast
}

func (h HandlerID) isTestHandler() bool {
	return h >= handlerTest && h <= handlerTest2
}

// RemoteErr is a remote error type.
// Any error seen on a remote will be returned like this.
type RemoteErr string

// NewRemoteErr creates a new remote error.
// The error type is not preserved.
func NewRemoteErr(err error) *RemoteErr {
	if err == nil {
		return nil
	}
	r := RemoteErr(err.Error())
	return &r
}

// NewRemoteErrf creates a new remote error from a format string.
func NewRemoteErrf(format string, a ...any) *RemoteErr {
	r := RemoteErr(fmt.Sprintf(format, a...))
	return &r
}

// NewNPErr is a helper to no payload and optional remote error.
// The error type is not preserved.
func NewNPErr(err error) (NoPayload, *RemoteErr) {
	if err == nil {
		return NoPayload{}, nil
	}
	r := RemoteErr(err.Error())
	return NoPayload{}, &r
}

// NewRemoteErrString creates a new remote error from a string.
func NewRemoteErrString(msg string) *RemoteErr {
	r := RemoteErr(msg)
	return &r
}

func (r RemoteErr) Error() string {
	return string(r)
}

// Is returns if the string representation matches.
func (r *RemoteErr) Is(other error) bool {
	if r == nil || other == nil {
		return r == other
	}
	var o RemoteErr
	if errors.As(other, &o) {
		return r == &o
	}
	return false
}

// IsRemoteErr returns the value if the error is a RemoteErr.
func IsRemoteErr(err error) *RemoteErr {
	var r RemoteErr
	if errors.As(err, &r) {
		return &r
	}
	return nil
}

type (
	// SingleHandlerFn is handlers for one to one requests.
	// A non-nil error value will be returned as RemoteErr(msg) to client.
	// No client information or cancellation (deadline) is available.
	// Include this in payload if needed.
	// Payload should be recycled with PutByteBuffer if not needed after the call.
	SingleHandlerFn func(payload []byte) ([]byte, *RemoteErr)

	// StatelessHandlerFn must handle incoming stateless request.
	// A non-nil error value will be returned as RemoteErr(msg) to client.
	StatelessHandlerFn func(ctx context.Context, payload []byte, resp chan<- []byte) *RemoteErr

	// StatelessHandler is handlers for one to many requests,
	// where responses may be dropped.
	// Stateless requests provide no incoming stream and there is no flow control
	// on outgoing messages.
	StatelessHandler struct {
		Handle StatelessHandlerFn
		// OutCapacity is the output capacity on the caller.
		// If <= 0 capacity will be 1.
		OutCapacity int
	}

	// StreamHandlerFn must process a request with an optional initial payload.
	// It must keep consuming from 'in' until it returns.
	// 'in' and 'out' are independent.
	// The handler should never close out.
	// Buffers received from 'in'  can be recycled with PutByteBuffer.
	// Buffers sent on out can not be referenced once sent.
	StreamHandlerFn func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *RemoteErr

	// StreamHandler handles fully bidirectional streams,
	// There is flow control in both directions.
	StreamHandler struct {
		// Handle an incoming request. Initial payload is sent.
		// Additional input packets (if any) are streamed to request.
		// Upstream will block when request channel is full.
		// Response packets can be sent at any time.
		// Any non-nil error sent as response means no more responses are sent.
		Handle StreamHandlerFn

		// Subroute for handler.
		// Subroute must be static and clients should specify a matching subroute.
		// Should not be set unless there are different handlers for the same HandlerID.
		Subroute string

		// OutCapacity is the output capacity. If <= 0 capacity will be 1.
		OutCapacity int

		// InCapacity is the output capacity.
		// If == 0 no input is expected
		InCapacity int
	}
)

type subHandlerID [32]byte

func makeSubHandlerID(id HandlerID, subRoute string) subHandlerID {
	b := subHandlerID(sha256.Sum256([]byte(subRoute)))
	b[0] = byte(id)
	b[1] = 0 // Reserved
	return b
}

func (s subHandlerID) withHandler(id HandlerID) subHandlerID {
	s[0] = byte(id)
	s[1] = 0 // Reserved
	return s
}

func (s *subHandlerID) String() string {
	if s == nil {
		return ""
	}
	return hex.EncodeToString(s[:])
}

func makeZeroSubHandlerID(id HandlerID) subHandlerID {
	return subHandlerID{byte(id)}
}

type handlers struct {
	single    [handlerLast]SingleHandlerFn
	stateless [handlerLast]*StatelessHandler
	streams   [handlerLast]*StreamHandler

	subSingle    map[subHandlerID]SingleHandlerFn
	subStateless map[subHandlerID]*StatelessHandler
	subStreams   map[subHandlerID]*StreamHandler
}

func (h *handlers) init() {
	h.subSingle = make(map[subHandlerID]SingleHandlerFn)
	h.subStateless = make(map[subHandlerID]*StatelessHandler)
	h.subStreams = make(map[subHandlerID]*StreamHandler)
}

func (h *handlers) hasAny(id HandlerID) bool {
	if !id.valid() {
		return false
	}
	return h.single[id] != nil || h.stateless[id] != nil || h.streams[id] != nil
}

func (h *handlers) hasSubhandler(id subHandlerID) bool {
	return h.subSingle[id] != nil || h.subStateless[id] != nil || h.subStreams[id] != nil
}

// RoundTripper provides an interface for type roundtrip serialization.
type RoundTripper interface {
	msgp.Unmarshaler
	msgp.Marshaler
	msgp.Sizer

	comparable
}

// SingleHandler is a type safe handler for single roundtrip requests.
type SingleHandler[Req, Resp RoundTripper] struct {
	id            HandlerID
	sharedResp    bool
	callReuseReq  bool
	ignoreNilConn bool

	newReq  func() Req
	newResp func() Resp

	recycleReq  func(Req)
	recycleResp func(Resp)
}

func recycleFunc[RT RoundTripper](newRT func() RT) (newFn func() RT, recycle func(r RT)) {
	rAny := any(newRT())
	var rZero RT
	if _, ok := rAny.(Recycler); ok {
		return newRT, func(r RT) {
			if r != rZero {
				if rc, ok := any(r).(Recycler); ok {
					rc.Recycle()
				}
			}
		}
	}
	pool := sync.Pool{
		New: func() interface{} {
			return newRT()
		},
	}
	return func() RT { return pool.Get().(RT) },
		func(r RT) {
			if r != rZero {
				pool.Put(r)
			}
		}
}

// NewSingleHandler creates a typed handler that can provide Marshal/Unmarshal.
// Use Register to register a server handler.
// Use Call to initiate a clientside call.
func NewSingleHandler[Req, Resp RoundTripper](h HandlerID, newReq func() Req, newResp func() Resp) *SingleHandler[Req, Resp] {
	s := SingleHandler[Req, Resp]{id: h}
	s.newReq, s.recycleReq = recycleFunc[Req](newReq)
	s.newResp, s.recycleResp = recycleFunc[Resp](newResp)
	if _, ok := any(newReq()).(Recycler); ok {
		s.callReuseReq = true
	}
	return &s
}

// PutResponse will accept a response for reuse.
// This can be used by a caller to recycle a response after receiving it from a Call.
func (h *SingleHandler[Req, Resp]) PutResponse(r Resp) {
	h.recycleResp(r)
}

// AllowCallRequestPool indicates it is safe to reuse the request
// on the client side, meaning the request is recycled/pooled when a request is sent.
// CAREFUL: This should only be used when there are no pointers, slices that aren't freshly constructed.
func (h *SingleHandler[Req, Resp]) AllowCallRequestPool(b bool) *SingleHandler[Req, Resp] {
	h.callReuseReq = b
	return h
}

// IgnoreNilConn will ignore nil connections when calling.
// This will make Call return nil instead of ErrDisconnected when the connection is nil.
// This may only be set ONCE before use.
func (h *SingleHandler[Req, Resp]) IgnoreNilConn() *SingleHandler[Req, Resp] {
	if h.ignoreNilConn {
		logger.LogOnceIf(context.Background(), fmt.Errorf("%s: IgnoreNilConn called twice", h.id.String()), h.id.String()+"IgnoreNilConn")
	}
	h.ignoreNilConn = true
	return h
}

// WithSharedResponse indicates it is unsafe to reuse the response
// when it has been returned on a handler.
// This will disable automatic response recycling/pooling.
// Typically this is used when the response sharing part of its data structure.
func (h *SingleHandler[Req, Resp]) WithSharedResponse() *SingleHandler[Req, Resp] {
	h.sharedResp = true
	return h
}

// NewResponse creates a new response.
// Handlers can use this to create a reusable response.
// The response may be reused, so caller should clear any fields.
func (h *SingleHandler[Req, Resp]) NewResponse() Resp {
	return h.newResp()
}

// NewRequest creates a new request.
// Handlers can use this to create a reusable request.
// The request may be reused, so caller should clear any fields.
func (h *SingleHandler[Req, Resp]) NewRequest() Req {
	return h.newReq()
}

// Register a handler for a Req -> Resp roundtrip.
// Requests are automatically recycled.
func (h *SingleHandler[Req, Resp]) Register(m *Manager, handle func(req Req) (resp Resp, err *RemoteErr), subroute ...string) error {
	if h.newReq == nil {
		return errors.New("newReq nil in NewSingleHandler")
	}
	if h.newResp == nil {
		return errors.New("newResp nil in NewSingleHandler")
	}
	return m.RegisterSingleHandler(h.id, func(payload []byte) ([]byte, *RemoteErr) {
		req := h.NewRequest()
		_, err := req.UnmarshalMsg(payload)
		if err != nil {
			PutByteBuffer(payload)
			r := RemoteErr(err.Error())
			return nil, &r
		}
		resp, rerr := handle(req)
		h.recycleReq(req)

		if rerr != nil {
			PutByteBuffer(payload)
			return nil, rerr
		}
		payload, err = resp.MarshalMsg(payload[:0])
		if !h.sharedResp {
			h.PutResponse(resp)
		}
		if err != nil {
			PutByteBuffer(payload)
			r := RemoteErr(err.Error())
			return nil, &r
		}
		return payload, nil
	}, subroute...)
}

// Requester is able to send requests to a remote.
type Requester interface {
	Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)
}

// Call the remote with the request and return the response.
// The response should be returned with PutResponse when no error.
// If no deadline is set, a 1-minute deadline is added.
func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Req) (resp Resp, err error) {
	if c == nil {
		if h.ignoreNilConn {
			return resp, nil
		}
		return resp, ErrDisconnected
	}
	payload, err := req.MarshalMsg(GetByteBuffer()[:0])
	if err != nil {
		return resp, err
	}
	switch any(req).(type) {
	case *MSS, *URLValues:
		ctx = context.WithValue(ctx, TraceParamsKey{}, req)
	case *NoPayload, *Bytes:
		// do not need to trace nopayload and bytes payload
	default:
		ctx = context.WithValue(ctx, TraceParamsKey{}, fmt.Sprintf("type=%T", req))
	}
	if h.callReuseReq {
		defer h.recycleReq(req)
	}
	res, err := c.Request(ctx, h.id, payload)
	PutByteBuffer(payload)
	if err != nil {
		return resp, err
	}
	defer PutByteBuffer(res)
	r := h.NewResponse()
	_, err = r.UnmarshalMsg(res)
	if err != nil {
		h.PutResponse(r)
		return resp, err
	}
	return r, err
}

// RemoteClient contains information about the caller.
type RemoteClient struct {
	Name string
}

type (
	ctxCallerKey   = struct{}
	ctxSubrouteKey = struct{}
)

// GetCaller returns caller information from contexts provided to handlers.
func GetCaller(ctx context.Context) *RemoteClient {
	val, _ := ctx.Value(ctxCallerKey{}).(*RemoteClient)
	return val
}

// GetSubroute returns caller information from contexts provided to handlers.
func GetSubroute(ctx context.Context) string {
	val, _ := ctx.Value(ctxSubrouteKey{}).(string)
	return val
}

func setCaller(ctx context.Context, cl *RemoteClient) context.Context {
	return context.WithValue(ctx, ctxCallerKey{}, cl)
}

func setSubroute(ctx context.Context, s string) context.Context {
	return context.WithValue(ctx, ctxSubrouteKey{}, s)
}

// StreamTypeHandler is a type safe handler for streaming requests.
type StreamTypeHandler[Payload, Req, Resp RoundTripper] struct {
	WithPayload bool

	// Override the default capacities (1)
	OutCapacity int

	// Set to 0 if no input is expected.
	// Will be 0 if newReq is nil.
	InCapacity int

	reqPool        sync.Pool
	respPool       sync.Pool
	id             HandlerID
	newPayload     func() Payload
	nilReq         Req
	nilResp        Resp
	sharedResponse bool
}

// NewStream creates a typed handler that can provide Marshal/Unmarshal.
// Use Register to register a server handler.
// Use Call to initiate a clientside call.
// newPayload can be nil. In that case payloads will always be nil.
// newReq can be nil. In that case no input stream is expected and the handler will be called with nil 'in' channel.
func NewStream[Payload, Req, Resp RoundTripper](h HandlerID, newPayload func() Payload, newReq func() Req, newResp func() Resp) *StreamTypeHandler[Payload, Req, Resp] {
	if newResp == nil {
		panic("newResp missing in NewStream")
	}

	s := newStreamHandler[Payload, Req, Resp](h)
	if newReq != nil {
		s.reqPool.New = func() interface{} {
			return newReq()
		}
	} else {
		s.InCapacity = 0
	}
	s.respPool.New = func() interface{} {
		return newResp()
	}
	s.newPayload = newPayload
	s.WithPayload = newPayload != nil
	return s
}

// WithSharedResponse indicates it is unsafe to reuse the response.
// Typically this is used when the response sharing part of its data structure.
func (h *StreamTypeHandler[Payload, Req, Resp]) WithSharedResponse() *StreamTypeHandler[Payload, Req, Resp] {
	h.sharedResponse = true
	return h
}

// NewPayload creates a new payload.
func (h *StreamTypeHandler[Payload, Req, Resp]) NewPayload() Payload {
	return h.newPayload()
}

// NewRequest creates a new request.
// The struct may be reused, so caller should clear any fields.
func (h *StreamTypeHandler[Payload, Req, Resp]) NewRequest() Req {
	return h.reqPool.Get().(Req)
}

// PutRequest will accept a request for reuse.
// These should be returned by the handler.
func (h *StreamTypeHandler[Payload, Req, Resp]) PutRequest(r Req) {
	if r != h.nilReq {
		h.reqPool.Put(r)
	}
}

// PutResponse will accept a response for reuse.
// These should be returned by the caller.
func (h *StreamTypeHandler[Payload, Req, Resp]) PutResponse(r Resp) {
	if r != h.nilResp {
		h.respPool.Put(r)
	}
}

// NewResponse creates a new response.
// Handlers can use this to create a reusable response.
func (h *StreamTypeHandler[Payload, Req, Resp]) NewResponse() Resp {
	return h.respPool.Get().(Resp)
}

func newStreamHandler[Payload, Req, Resp RoundTripper](h HandlerID) *StreamTypeHandler[Payload, Req, Resp] {
	return &StreamTypeHandler[Payload, Req, Resp]{id: h, InCapacity: 1, OutCapacity: 1}
}

// Register a handler for two-way streaming with payload, input stream and output stream.
// An optional subroute can be given. Multiple entries are joined with '/'.
func (h *StreamTypeHandler[Payload, Req, Resp]) Register(m *Manager, handle func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error {
	return h.register(m, handle, subroute...)
}

// WithOutCapacity adjusts the output capacity from the handler perspective.
// This must be done prior to registering the handler.
func (h *StreamTypeHandler[Payload, Req, Resp]) WithOutCapacity(out int) *StreamTypeHandler[Payload, Req, Resp] {
	h.OutCapacity = out
	return h
}

// WithInCapacity adjusts the input capacity from the handler perspective.
// This must be done prior to registering the handler.
func (h *StreamTypeHandler[Payload, Req, Resp]) WithInCapacity(in int) *StreamTypeHandler[Payload, Req, Resp] {
	h.InCapacity = in
	return h
}

// RegisterNoInput a handler for one-way streaming with payload and output stream.
// An optional subroute can be given. Multiple entries are joined with '/'.
func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput(m *Manager, handle func(ctx context.Context, p Payload, out chan<- Resp) *RemoteErr, subroute ...string) error {
	h.InCapacity = 0
	return h.register(m, func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr {
		return handle(ctx, p, out)
	}, subroute...)
}

// RegisterNoPayload a handler for one-way streaming with payload and output stream.
// An optional subroute can be given. Multiple entries are joined with '/'.
func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoPayload(m *Manager, handle func(ctx context.Context, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error {
	h.WithPayload = false
	return h.register(m, func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr {
		return handle(ctx, in, out)
	}, subroute...)
}

// Register a handler for two-way streaming with optional payload and input stream.
func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func(ctx context.Context, p Payload, in <-chan Req, out chan<- Resp) *RemoteErr, subroute ...string) error {
	return m.RegisterStreamingHandler(h.id, StreamHandler{
		Handle: func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *RemoteErr {
			var plT Payload
			if h.WithPayload {
				plT = h.NewPayload()
				_, err := plT.UnmarshalMsg(payload)
				PutByteBuffer(payload)
				if err != nil {
					r := RemoteErr(err.Error())
					return &r
				}
			}

			var inT chan Req
			if h.InCapacity > 0 {
				// Don't add extra buffering
				inT = make(chan Req)
				go func() {
					defer xioutil.SafeClose(inT)
					for {
						select {
						case <-ctx.Done():
							return
						case v, ok := <-in:
							if !ok {
								return
							}
							input := h.NewRequest()
							_, err := input.UnmarshalMsg(v)
							if err != nil {
								logger.LogOnceIf(ctx, err, err.Error())
							}
							PutByteBuffer(v)
							// Send input
							select {
							case <-ctx.Done():
								return
							case inT <- input:
							}
						}
					}
				}()
			}
			outT := make(chan Resp)
			outDone := make(chan struct{})
			go func() {
				defer xioutil.SafeClose(outDone)
				dropOutput := false
				for v := range outT {
					if dropOutput {
						continue
					}
					dst := GetByteBuffer()
					dst, err := v.MarshalMsg(dst[:0])
					if err != nil {
						logger.LogOnceIf(ctx, err, err.Error())
					}
					if !h.sharedResponse {
						h.PutResponse(v)
					}
					select {
					case <-ctx.Done():
						dropOutput = true
					case out <- dst:
					}
				}
			}()
			rErr := handle(ctx, plT, inT, outT)
			xioutil.SafeClose(outT)
			<-outDone
			return rErr
		}, OutCapacity: h.OutCapacity, InCapacity: h.InCapacity, Subroute: strings.Join(subroute, "/"),
	})
}

// TypedStream is a stream with specific types.
type TypedStream[Req, Resp RoundTripper] struct {
	// responses from the remote server.
	// Channel will be closed after error or when remote closes.
	// responses *must* be read to either an error is returned or the channel is closed.
	responses *Stream
	newResp   func() Resp

	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- Req
}

// Results returns the results from the remote server one by one.
// If any error is returned by the callback, the stream will be canceled.
// If the context is canceled, the stream will be canceled.
func (s *TypedStream[Req, Resp]) Results(next func(resp Resp) error) (err error) {
	return s.responses.Results(func(b []byte) error {
		resp := s.newResp()
		_, err := resp.UnmarshalMsg(b)
		if err != nil {
			return err
		}
		return next(resp)
	})
}

// Streamer creates a stream.
type Streamer interface {
	NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)
}

// Call the remove with the request and
func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Streamer, payload Payload) (st *TypedStream[Req, Resp], err error) {
	if c == nil {
		return nil, ErrDisconnected
	}
	var payloadB []byte
	if h.WithPayload {
		var err error
		payloadB, err = payload.MarshalMsg(GetByteBuffer()[:0])
		if err != nil {
			return nil, err
		}
	}
	stream, err := c.NewStream(ctx, h.id, payloadB)
	PutByteBuffer(payloadB)
	if err != nil {
		return nil, err
	}

	// respT := make(chan TypedResponse[Resp])
	var reqT chan Req
	if h.InCapacity > 0 {
		reqT = make(chan Req)
		// Request handler
		if stream.Requests == nil {
			return nil, fmt.Errorf("internal error: stream request channel nil")
		}
		go func() {
			defer xioutil.SafeClose(stream.Requests)
			for req := range reqT {
				b, err := req.MarshalMsg(GetByteBuffer()[:0])
				if err != nil {
					logger.LogOnceIf(ctx, err, err.Error())
				}
				h.PutRequest(req)
				stream.Requests <- b
			}
		}()
	} else if stream.Requests != nil {
		xioutil.SafeClose(stream.Requests)
	}

	return &TypedStream[Req, Resp]{responses: stream, newResp: h.NewResponse, Requests: reqT}, nil
}