mirror of https://github.com/minio/minio.git
663 lines
16 KiB
Go
663 lines
16 KiB
Go
// Copyright (c) 2015-2023 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package grid
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
xioutil "github.com/minio/minio/internal/ioutil"
|
|
"github.com/zeebo/xxh3"
|
|
)
|
|
|
|
// muxClient is a stateful connection to a remote.
|
|
type muxClient struct {
|
|
MuxID uint64
|
|
SendSeq, RecvSeq uint32
|
|
LastPong int64
|
|
BaseFlags Flags
|
|
ctx context.Context
|
|
cancelFn context.CancelCauseFunc
|
|
parent *Connection
|
|
respWait chan<- Response
|
|
respMu sync.Mutex
|
|
singleResp bool
|
|
closed bool
|
|
stateless bool
|
|
acked bool
|
|
init bool
|
|
deadline time.Duration
|
|
outBlock chan struct{}
|
|
subroute *subHandlerID
|
|
respErr atomic.Pointer[error]
|
|
clientPingInterval time.Duration
|
|
}
|
|
|
|
// Response is a response from the server.
|
|
type Response struct {
|
|
Msg []byte
|
|
Err error
|
|
}
|
|
|
|
func newMuxClient(ctx context.Context, muxID uint64, parent *Connection) *muxClient {
|
|
ctx, cancelFn := context.WithCancelCause(ctx)
|
|
return &muxClient{
|
|
MuxID: muxID,
|
|
ctx: ctx,
|
|
cancelFn: cancelFn,
|
|
parent: parent,
|
|
LastPong: time.Now().UnixNano(),
|
|
BaseFlags: parent.baseFlags,
|
|
clientPingInterval: parent.clientPingInterval,
|
|
}
|
|
}
|
|
|
|
// roundtrip performs a roundtrip, returning the first response.
|
|
// This cannot be used concurrently.
|
|
func (m *muxClient) roundtrip(h HandlerID, req []byte) ([]byte, error) {
|
|
if m.init {
|
|
return nil, errors.New("mux client already used")
|
|
}
|
|
m.init = true
|
|
m.singleResp = true
|
|
msg := message{
|
|
Op: OpRequest,
|
|
MuxID: m.MuxID,
|
|
Handler: h,
|
|
Flags: m.BaseFlags | FlagEOF,
|
|
Payload: req,
|
|
DeadlineMS: uint32(m.deadline.Milliseconds()),
|
|
}
|
|
if m.subroute != nil {
|
|
msg.Flags |= FlagSubroute
|
|
}
|
|
ch := make(chan Response, 1)
|
|
m.respMu.Lock()
|
|
if m.closed {
|
|
m.respMu.Unlock()
|
|
return nil, ErrDisconnected
|
|
}
|
|
m.respWait = ch
|
|
m.respMu.Unlock()
|
|
ctx := m.ctx
|
|
|
|
// Add deadline if none.
|
|
if msg.DeadlineMS == 0 {
|
|
msg.DeadlineMS = uint32(defaultSingleRequestTimeout / time.Millisecond)
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, defaultSingleRequestTimeout)
|
|
defer cancel()
|
|
}
|
|
// Send request
|
|
if err := m.send(msg); err != nil {
|
|
return nil, err
|
|
}
|
|
if debugReqs {
|
|
fmt.Println(m.MuxID, m.parent.String(), "SEND")
|
|
}
|
|
// Wait for response or context.
|
|
select {
|
|
case v, ok := <-ch:
|
|
if !ok {
|
|
return nil, ErrDisconnected
|
|
}
|
|
if debugReqs && v.Err != nil {
|
|
v.Err = fmt.Errorf("%d %s RESP ERR: %w", m.MuxID, m.parent.String(), v.Err)
|
|
}
|
|
return v.Msg, v.Err
|
|
case <-ctx.Done():
|
|
if debugReqs {
|
|
return nil, fmt.Errorf("%d %s ERR: %w", m.MuxID, m.parent.String(), context.Cause(ctx))
|
|
}
|
|
return nil, context.Cause(ctx)
|
|
}
|
|
}
|
|
|
|
// send the message. msg.Seq and msg.MuxID will be set
|
|
func (m *muxClient) send(msg message) error {
|
|
m.respMu.Lock()
|
|
defer m.respMu.Unlock()
|
|
if m.closed {
|
|
return errors.New("mux client closed")
|
|
}
|
|
return m.sendLocked(msg)
|
|
}
|
|
|
|
// sendLocked the message. msg.Seq and msg.MuxID will be set.
|
|
// m.respMu must be held.
|
|
func (m *muxClient) sendLocked(msg message) error {
|
|
dst := GetByteBufferCap(msg.Msgsize())
|
|
msg.Seq = m.SendSeq
|
|
msg.MuxID = m.MuxID
|
|
msg.Flags |= m.BaseFlags
|
|
if debugPrint {
|
|
fmt.Println("Client sending", &msg, "to", m.parent.Remote)
|
|
}
|
|
m.SendSeq++
|
|
|
|
dst, err := msg.MarshalMsg(dst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if msg.Flags&FlagSubroute != 0 {
|
|
if m.subroute == nil {
|
|
return fmt.Errorf("internal error: subroute not defined on client")
|
|
}
|
|
hid := m.subroute.withHandler(msg.Handler)
|
|
before := len(dst)
|
|
dst = append(dst, hid[:]...)
|
|
if debugPrint {
|
|
fmt.Println("Added subroute", hid.String(), "to message", msg, "len", len(dst)-before)
|
|
}
|
|
}
|
|
if msg.Flags&FlagCRCxxh3 != 0 {
|
|
h := xxh3.Hash(dst)
|
|
dst = binary.LittleEndian.AppendUint32(dst, uint32(h))
|
|
}
|
|
return m.parent.send(m.ctx, dst)
|
|
}
|
|
|
|
// RequestStateless will send a single payload request and stream back results.
|
|
// req may not be read/written to after calling.
|
|
// TODO: Not implemented
|
|
func (m *muxClient) RequestStateless(h HandlerID, req []byte, out chan<- Response) {
|
|
if m.init {
|
|
out <- Response{Err: errors.New("mux client already used")}
|
|
}
|
|
m.init = true
|
|
|
|
// Try to grab an initial block.
|
|
m.singleResp = false
|
|
msg := message{
|
|
Op: OpConnectMux,
|
|
Handler: h,
|
|
Flags: FlagEOF,
|
|
Payload: req,
|
|
DeadlineMS: uint32(m.deadline.Milliseconds()),
|
|
}
|
|
msg.setZeroPayloadFlag()
|
|
if m.subroute != nil {
|
|
msg.Flags |= FlagSubroute
|
|
}
|
|
|
|
// Send...
|
|
err := m.send(msg)
|
|
if err != nil {
|
|
out <- Response{Err: err}
|
|
return
|
|
}
|
|
|
|
// Route directly to output.
|
|
m.respWait = out
|
|
}
|
|
|
|
// RequestStream will send a single payload request and stream back results.
|
|
// 'requests' can be nil, in which case only req is sent as input.
|
|
// It will however take less resources.
|
|
func (m *muxClient) RequestStream(h HandlerID, payload []byte, requests chan []byte, responses chan Response) (*Stream, error) {
|
|
if m.init {
|
|
return nil, errors.New("mux client already used")
|
|
}
|
|
if responses == nil {
|
|
return nil, errors.New("RequestStream: responses channel is nil")
|
|
}
|
|
m.init = true
|
|
m.respMu.Lock()
|
|
if m.closed {
|
|
m.respMu.Unlock()
|
|
return nil, ErrDisconnected
|
|
}
|
|
m.respWait = responses // Route directly to output.
|
|
m.respMu.Unlock()
|
|
|
|
// Try to grab an initial block.
|
|
m.singleResp = false
|
|
m.RecvSeq = m.SendSeq // Sync
|
|
if cap(requests) > 0 {
|
|
m.outBlock = make(chan struct{}, cap(requests))
|
|
}
|
|
msg := message{
|
|
Op: OpConnectMux,
|
|
Handler: h,
|
|
Payload: payload,
|
|
DeadlineMS: uint32(m.deadline.Milliseconds()),
|
|
}
|
|
msg.setZeroPayloadFlag()
|
|
if requests == nil {
|
|
msg.Flags |= FlagEOF
|
|
}
|
|
if m.subroute != nil {
|
|
msg.Flags |= FlagSubroute
|
|
}
|
|
|
|
// Send...
|
|
err := m.send(msg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if debugPrint {
|
|
fmt.Println("Connecting Mux", m.MuxID, ",to", m.parent.Remote)
|
|
}
|
|
|
|
// Space for one message and an error.
|
|
responseCh := make(chan Response, 1)
|
|
|
|
// Spawn simple disconnect
|
|
if requests == nil {
|
|
go m.handleOneWayStream(responseCh, responses)
|
|
return &Stream{responses: responseCh, Requests: nil, ctx: m.ctx, cancel: m.cancelFn, muxID: m.MuxID}, nil
|
|
}
|
|
|
|
// Deliver responses and send unblocks back to the server.
|
|
go m.handleTwowayResponses(responseCh, responses)
|
|
go m.handleTwowayRequests(responses, requests)
|
|
|
|
return &Stream{responses: responseCh, Requests: requests, ctx: m.ctx, cancel: m.cancelFn, muxID: m.MuxID}, nil
|
|
}
|
|
|
|
func (m *muxClient) addErrorNonBlockingClose(respHandler chan<- Response, err error) {
|
|
m.respMu.Lock()
|
|
defer m.respMu.Unlock()
|
|
if !m.closed {
|
|
m.respErr.Store(&err)
|
|
// Do not block.
|
|
select {
|
|
case respHandler <- Response{Err: err}:
|
|
xioutil.SafeClose(respHandler)
|
|
default:
|
|
go func() {
|
|
respHandler <- Response{Err: err}
|
|
xioutil.SafeClose(respHandler)
|
|
}()
|
|
}
|
|
gridLogIf(m.ctx, m.sendLocked(message{Op: OpDisconnectServerMux, MuxID: m.MuxID}))
|
|
m.closed = true
|
|
}
|
|
}
|
|
|
|
// respHandler
|
|
func (m *muxClient) handleOneWayStream(respHandler chan<- Response, respServer <-chan Response) {
|
|
if debugPrint {
|
|
start := time.Now()
|
|
defer func() {
|
|
fmt.Println("Mux", m.MuxID, "Request took", time.Since(start).Round(time.Millisecond))
|
|
}()
|
|
}
|
|
defer func() {
|
|
// addErrorNonBlockingClose will close the response channel
|
|
// - maybe async, so we shouldn't do it here.
|
|
if m.respErr.Load() == nil {
|
|
xioutil.SafeClose(respHandler)
|
|
}
|
|
}()
|
|
var pingTimer <-chan time.Time
|
|
if m.deadline == 0 || m.deadline > m.clientPingInterval {
|
|
ticker := time.NewTicker(m.clientPingInterval)
|
|
defer ticker.Stop()
|
|
pingTimer = ticker.C
|
|
atomic.StoreInt64(&m.LastPong, time.Now().UnixNano())
|
|
}
|
|
defer m.parent.deleteMux(false, m.MuxID)
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
if debugPrint {
|
|
fmt.Println("Client sending disconnect to mux", m.MuxID)
|
|
}
|
|
err := context.Cause(m.ctx)
|
|
if !errors.Is(err, errStreamEOF) {
|
|
m.addErrorNonBlockingClose(respHandler, err)
|
|
}
|
|
return
|
|
case resp, ok := <-respServer:
|
|
if !ok {
|
|
return
|
|
}
|
|
sendResp:
|
|
select {
|
|
case respHandler <- resp:
|
|
m.respMu.Lock()
|
|
if !m.closed {
|
|
gridLogIf(m.ctx, m.sendLocked(message{Op: OpUnblockSrvMux, MuxID: m.MuxID}))
|
|
}
|
|
m.respMu.Unlock()
|
|
case <-m.ctx.Done():
|
|
// Client canceled. Don't block.
|
|
// Next loop will catch it.
|
|
case <-pingTimer:
|
|
if !m.doPing(respHandler) {
|
|
return
|
|
}
|
|
goto sendResp
|
|
}
|
|
case <-pingTimer:
|
|
if !m.doPing(respHandler) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// doPing checks last ping time and sends another ping.
|
|
func (m *muxClient) doPing(respHandler chan<- Response) (ok bool) {
|
|
m.respMu.Lock()
|
|
if m.closed {
|
|
m.respMu.Unlock()
|
|
// Already closed. This is not an error state;
|
|
// we may just be delivering the last responses.
|
|
return true
|
|
}
|
|
|
|
// Only check ping when not closed.
|
|
if got := time.Since(time.Unix(0, atomic.LoadInt64(&m.LastPong))); got > m.clientPingInterval*2 {
|
|
m.respMu.Unlock()
|
|
if debugPrint {
|
|
fmt.Printf("Mux %d: last pong %v ago, disconnecting\n", m.MuxID, got)
|
|
}
|
|
m.addErrorNonBlockingClose(respHandler, ErrDisconnected)
|
|
return false
|
|
}
|
|
|
|
// Send new ping
|
|
err := m.sendLocked(message{Op: OpPing, MuxID: m.MuxID})
|
|
m.respMu.Unlock()
|
|
if err != nil {
|
|
m.addErrorNonBlockingClose(respHandler, err)
|
|
}
|
|
return err == nil
|
|
}
|
|
|
|
// responseCh is the channel to that goes to the requester.
|
|
// internalResp is the channel that comes from the server.
|
|
func (m *muxClient) handleTwowayResponses(responseCh chan<- Response, internalResp <-chan Response) {
|
|
defer func() {
|
|
m.parent.deleteMux(false, m.MuxID)
|
|
// addErrorNonBlockingClose will close the response channel.
|
|
xioutil.SafeClose(responseCh)
|
|
}()
|
|
|
|
// Cancelation and errors are handled by handleTwowayRequests below.
|
|
for resp := range internalResp {
|
|
m.send(message{Op: OpUnblockSrvMux, MuxID: m.MuxID})
|
|
responseCh <- resp
|
|
}
|
|
}
|
|
|
|
func (m *muxClient) handleTwowayRequests(errResp chan<- Response, requests <-chan []byte) {
|
|
var errState bool
|
|
if debugPrint {
|
|
start := time.Now()
|
|
defer func() {
|
|
fmt.Println("Mux", m.MuxID, "Request took", time.Since(start).Round(time.Millisecond))
|
|
}()
|
|
}
|
|
|
|
var pingTimer <-chan time.Time
|
|
if m.deadline == 0 || m.deadline > m.clientPingInterval {
|
|
ticker := time.NewTicker(m.clientPingInterval)
|
|
defer ticker.Stop()
|
|
pingTimer = ticker.C
|
|
atomic.StoreInt64(&m.LastPong, time.Now().UnixNano())
|
|
}
|
|
|
|
// Listen for client messages.
|
|
reqLoop:
|
|
for !errState {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
if debugPrint {
|
|
fmt.Println("Client sending disconnect to mux", m.MuxID)
|
|
}
|
|
m.addErrorNonBlockingClose(errResp, context.Cause(m.ctx))
|
|
errState = true
|
|
continue
|
|
case <-pingTimer:
|
|
if !m.doPing(errResp) {
|
|
errState = true
|
|
continue
|
|
}
|
|
case req, ok := <-requests:
|
|
if !ok {
|
|
// Done send EOF
|
|
if debugPrint {
|
|
fmt.Println("Client done, sending EOF to mux", m.MuxID)
|
|
}
|
|
msg := message{
|
|
Op: OpMuxClientMsg,
|
|
MuxID: m.MuxID,
|
|
Flags: FlagEOF,
|
|
}
|
|
msg.setZeroPayloadFlag()
|
|
err := m.send(msg)
|
|
if err != nil {
|
|
m.addErrorNonBlockingClose(errResp, err)
|
|
}
|
|
break reqLoop
|
|
}
|
|
// Grab a send token.
|
|
sendReq:
|
|
select {
|
|
case <-m.ctx.Done():
|
|
m.addErrorNonBlockingClose(errResp, context.Cause(m.ctx))
|
|
errState = true
|
|
continue
|
|
case <-pingTimer:
|
|
if !m.doPing(errResp) {
|
|
errState = true
|
|
continue
|
|
}
|
|
goto sendReq
|
|
case <-m.outBlock:
|
|
}
|
|
msg := message{
|
|
Op: OpMuxClientMsg,
|
|
MuxID: m.MuxID,
|
|
Seq: 1,
|
|
Payload: req,
|
|
}
|
|
msg.setZeroPayloadFlag()
|
|
err := m.send(msg)
|
|
PutByteBuffer(req)
|
|
if err != nil {
|
|
m.addErrorNonBlockingClose(errResp, err)
|
|
errState = true
|
|
continue
|
|
}
|
|
msg.Seq++
|
|
}
|
|
}
|
|
|
|
if errState {
|
|
// Drain requests.
|
|
for {
|
|
select {
|
|
case r, ok := <-requests:
|
|
if !ok {
|
|
return
|
|
}
|
|
PutByteBuffer(r)
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
for !errState {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
if debugPrint {
|
|
fmt.Println("Client sending disconnect to mux", m.MuxID)
|
|
}
|
|
m.addErrorNonBlockingClose(errResp, context.Cause(m.ctx))
|
|
return
|
|
case <-pingTimer:
|
|
errState = !m.doPing(errResp)
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkSeq will check if sequence number is correct and increment it by 1.
|
|
func (m *muxClient) checkSeq(seq uint32) (ok bool) {
|
|
if seq != m.RecvSeq {
|
|
if debugPrint {
|
|
fmt.Printf("MuxID: %d client, expected sequence %d, got %d\n", m.MuxID, m.RecvSeq, seq)
|
|
}
|
|
m.addResponse(Response{Err: ErrIncorrectSequence})
|
|
return false
|
|
}
|
|
m.RecvSeq++
|
|
return true
|
|
}
|
|
|
|
// response will send handleIncoming response to client.
|
|
// may never block.
|
|
// Should return whether the next call would block.
|
|
func (m *muxClient) response(seq uint32, r Response) {
|
|
if debugReqs {
|
|
fmt.Println(m.MuxID, m.parent.String(), "RESP")
|
|
}
|
|
if debugPrint {
|
|
fmt.Printf("mux %d: got msg seqid %d, payload length: %d, err:%v\n", m.MuxID, seq, len(r.Msg), r.Err)
|
|
}
|
|
if !m.checkSeq(seq) {
|
|
if debugReqs {
|
|
fmt.Println(m.MuxID, m.parent.String(), "CHECKSEQ FAIL", m.RecvSeq, seq)
|
|
}
|
|
PutByteBuffer(r.Msg)
|
|
r.Msg = nil
|
|
r.Err = ErrIncorrectSequence
|
|
m.addResponse(r)
|
|
return
|
|
}
|
|
atomic.StoreInt64(&m.LastPong, time.Now().UnixNano())
|
|
ok := m.addResponse(r)
|
|
if !ok {
|
|
PutByteBuffer(r.Msg)
|
|
}
|
|
}
|
|
|
|
var errStreamEOF = errors.New("stream EOF")
|
|
|
|
// error is a message from the server to disconnect.
|
|
func (m *muxClient) error(err RemoteErr) {
|
|
if debugPrint {
|
|
fmt.Printf("mux %d: got remote err:%v\n", m.MuxID, string(err))
|
|
}
|
|
m.addResponse(Response{Err: &err})
|
|
}
|
|
|
|
func (m *muxClient) ack(seq uint32) {
|
|
if !m.checkSeq(seq) {
|
|
return
|
|
}
|
|
if m.acked || m.outBlock == nil {
|
|
return
|
|
}
|
|
available := cap(m.outBlock)
|
|
for i := 0; i < available; i++ {
|
|
m.outBlock <- struct{}{}
|
|
}
|
|
m.acked = true
|
|
}
|
|
|
|
func (m *muxClient) unblockSend(seq uint32) {
|
|
if !m.checkSeq(seq) {
|
|
return
|
|
}
|
|
select {
|
|
case m.outBlock <- struct{}{}:
|
|
default:
|
|
gridLogIf(m.ctx, errors.New("output unblocked overflow"))
|
|
}
|
|
}
|
|
|
|
func (m *muxClient) pong(msg pongMsg) {
|
|
if msg.NotFound || msg.Err != nil {
|
|
err := errors.New("remote terminated call")
|
|
if msg.Err != nil {
|
|
err = fmt.Errorf("remove pong failed: %v", &msg.Err)
|
|
}
|
|
m.addResponse(Response{Err: err})
|
|
return
|
|
}
|
|
atomic.StoreInt64(&m.LastPong, time.Now().UnixNano())
|
|
}
|
|
|
|
// addResponse will add a response to the response channel.
|
|
// This function will never block
|
|
func (m *muxClient) addResponse(r Response) (ok bool) {
|
|
m.respMu.Lock()
|
|
defer m.respMu.Unlock()
|
|
if m.closed {
|
|
return false
|
|
}
|
|
select {
|
|
case m.respWait <- r:
|
|
if r.Err != nil {
|
|
if debugPrint {
|
|
fmt.Println("Closing mux", m.MuxID, "due to error:", r.Err)
|
|
}
|
|
m.closeLocked()
|
|
}
|
|
return true
|
|
default:
|
|
if m.stateless {
|
|
// Drop message if not stateful.
|
|
return
|
|
}
|
|
err := errors.New("INTERNAL ERROR: Response was blocked")
|
|
gridLogIf(m.ctx, err)
|
|
m.closeLocked()
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (m *muxClient) close() {
|
|
if debugPrint {
|
|
fmt.Println("closing outgoing mux", m.MuxID)
|
|
}
|
|
if !m.respMu.TryLock() {
|
|
// Cancel before locking - will unblock any pending sends.
|
|
if m.cancelFn != nil {
|
|
m.cancelFn(context.Canceled)
|
|
}
|
|
// Wait for senders to release.
|
|
m.respMu.Lock()
|
|
}
|
|
|
|
defer m.respMu.Unlock()
|
|
m.closeLocked()
|
|
}
|
|
|
|
func (m *muxClient) closeLocked() {
|
|
if m.closed {
|
|
return
|
|
}
|
|
// We hold the lock, so nobody can modify m.respWait while we're closing.
|
|
if m.respWait != nil {
|
|
xioutil.SafeClose(m.respWait)
|
|
m.respWait = nil
|
|
}
|
|
m.closed = true
|
|
}
|