minio/cmd/storage-rest-server.go

1441 lines
46 KiB
Go
Raw Permalink Normal View History

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bufio"
"context"
"encoding/binary"
2019-03-18 16:07:58 -04:00
"encoding/hex"
"errors"
"fmt"
"io"
2019-03-18 16:07:58 -04:00
"net/http"
"os/user"
"path"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
"github.com/minio/minio/internal/grid"
"github.com/tinylib/msgp/msgp"
jwtreq "github.com/golang-jwt/jwt/v4/request"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/config"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
xjwt "github.com/minio/minio/internal/jwt"
"github.com/minio/minio/internal/logger"
"github.com/minio/mux"
xnet "github.com/minio/pkg/v3/net"
)
var errDiskStale = errors.New("drive stale")
// To abstract a disk over network.
type storageRESTServer struct {
endpoint Endpoint
}
var (
storageCheckPartsRPC = grid.NewSingleHandler[*CheckPartsHandlerParams, *CheckPartsResp](grid.HandlerCheckParts2, func() *CheckPartsHandlerParams { return &CheckPartsHandlerParams{} }, func() *CheckPartsResp { return &CheckPartsResp{} })
storageDeleteFileRPC = grid.NewSingleHandler[*DeleteFileHandlerParams, grid.NoPayload](grid.HandlerDeleteFile, func() *DeleteFileHandlerParams { return &DeleteFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true)
storageDeleteVersionRPC = grid.NewSingleHandler[*DeleteVersionHandlerParams, grid.NoPayload](grid.HandlerDeleteVersion, func() *DeleteVersionHandlerParams { return &DeleteVersionHandlerParams{} }, grid.NewNoPayload)
storageDiskInfoRPC = grid.NewSingleHandler[*DiskInfoOptions, *DiskInfo](grid.HandlerDiskInfo, func() *DiskInfoOptions { return &DiskInfoOptions{} }, func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse().AllowCallRequestPool(true)
storageNSScannerRPC = grid.NewStream[*nsScannerOptions, grid.NoPayload, *nsScannerResp](grid.HandlerNSScanner, func() *nsScannerOptions { return &nsScannerOptions{} }, nil, func() *nsScannerResp { return &nsScannerResp{} })
storageReadAllRPC = grid.NewSingleHandler[*ReadAllHandlerParams, *grid.Bytes](grid.HandlerReadAll, func() *ReadAllHandlerParams { return &ReadAllHandlerParams{} }, grid.NewBytes).AllowCallRequestPool(true)
storageWriteAllRPC = grid.NewSingleHandler[*WriteAllHandlerParams, grid.NoPayload](grid.HandlerWriteAll, func() *WriteAllHandlerParams { return &WriteAllHandlerParams{} }, grid.NewNoPayload)
storageReadVersionRPC = grid.NewSingleHandler[*grid.MSS, *FileInfo](grid.HandlerReadVersion, grid.NewMSS, func() *FileInfo { return &FileInfo{} })
storageReadXLRPC = grid.NewSingleHandler[*grid.MSS, *RawFileInfo](grid.HandlerReadXL, grid.NewMSS, func() *RawFileInfo { return &RawFileInfo{} })
storageRenameDataRPC = grid.NewSingleHandler[*RenameDataHandlerParams, *RenameDataResp](grid.HandlerRenameData2, func() *RenameDataHandlerParams { return &RenameDataHandlerParams{} }, func() *RenameDataResp { return &RenameDataResp{} })
storageRenameDataInlineRPC = grid.NewSingleHandler[*RenameDataInlineHandlerParams, *RenameDataResp](grid.HandlerRenameDataInline, newRenameDataInlineHandlerParams, func() *RenameDataResp { return &RenameDataResp{} }).AllowCallRequestPool(false)
storageRenameFileRPC = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true)
storageRenamePartRPC = grid.NewSingleHandler[*RenamePartHandlerParams, grid.NoPayload](grid.HandlerRenamePart, func() *RenamePartHandlerParams { return &RenamePartHandlerParams{} }, grid.NewNoPayload)
storageStatVolRPC = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerStatVol, grid.NewMSS, func() *VolInfo { return &VolInfo{} })
storageUpdateMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerUpdateMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload)
storageWriteMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerWriteMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload)
storageListDirRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *ListDirResult](grid.HandlerListDir, grid.NewMSS, nil, func() *ListDirResult { return &ListDirResult{} }).WithOutCapacity(1)
)
func getStorageViaEndpoint(endpoint Endpoint) StorageAPI {
globalLocalDrivesMu.RLock()
defer globalLocalDrivesMu.RUnlock()
if len(globalLocalSetDrives) == 0 {
return globalLocalDrivesMap[endpoint.String()]
}
return globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx]
}
func (s *storageRESTServer) getStorage() StorageAPI {
return getStorageViaEndpoint(s.endpoint)
}
func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
err = unwrapAll(err)
switch err {
case errDiskStale:
w.WriteHeader(http.StatusPreconditionFailed)
case errFileNotFound, errFileVersionNotFound:
w.WriteHeader(http.StatusNotFound)
case errInvalidAccessKeyID, errAccessKeyDisabled, errNoAuthToken, errMalformedAuth, errAuthentication, errSkewedAuthTime:
w.WriteHeader(http.StatusUnauthorized)
case context.Canceled, context.DeadlineExceeded:
w.WriteHeader(499)
default:
w.WriteHeader(http.StatusForbidden)
}
w.Write([]byte(err.Error()))
}
// DefaultSkewTime - skew time is 15 minutes between minio peers.
const DefaultSkewTime = 15 * time.Minute
// validateStorageRequestToken will validate the token against the provided audience.
func validateStorageRequestToken(token string) error {
claims := xjwt.NewStandardClaims()
if err := xjwt.ParseWithStandardClaims(token, claims, []byte(globalActiveCred.SecretKey)); err != nil {
return errAuthentication
}
owner := claims.AccessKey == globalActiveCred.AccessKey || claims.Subject == globalActiveCred.AccessKey
if !owner {
return errAuthentication
}
return nil
}
// Authenticates storage client's requests and validates for skewed time.
func storageServerRequestValidate(r *http.Request) error {
token, err := jwtreq.AuthorizationHeaderExtractor.ExtractToken(r)
if err != nil {
if err == jwtreq.ErrNoTokenInRequest {
return errNoAuthToken
}
return errMalformedAuth
}
if err = validateStorageRequestToken(token); err != nil {
return err
}
nanoTime, err := strconv.ParseInt(r.Header.Get("X-Minio-Time"), 10, 64)
if err != nil {
return errMalformedAuth
}
localTime := UTCNow()
remoteTime := time.Unix(0, nanoTime)
delta := remoteTime.Sub(localTime)
if delta < 0 {
delta *= -1
}
if delta > DefaultSkewTime {
return errSkewedAuthTime
}
return nil
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// IsAuthValid - To authenticate and verify the time difference.
func (s *storageRESTServer) IsAuthValid(w http.ResponseWriter, r *http.Request) bool {
if s.getStorage() == nil {
s.writeErrorResponse(w, errDiskNotFound)
return false
}
if err := storageServerRequestValidate(r); err != nil {
s.writeErrorResponse(w, err)
return false
}
return true
}
// IsValid - To authenticate and check if the disk-id in the request corresponds to the underlying disk.
func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
if !s.IsAuthValid(w, r) {
return false
}
if err := r.ParseForm(); err != nil {
s.writeErrorResponse(w, err)
return false
}
diskID := r.Form.Get(storageRESTDiskID)
if diskID == "" {
// Request sent empty disk-id, we allow the request
// as the peer might be coming up and trying to read format.json
// or create format.json
return true
}
storedDiskID, err := s.getStorage().GetDiskID()
if err != nil {
s.writeErrorResponse(w, err)
return false
}
if diskID != storedDiskID {
s.writeErrorResponse(w, errDiskStale)
return false
}
// If format.json is available and request sent the right disk-id, we allow the request
return true
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// checkID - check if the disk-id in the request corresponds to the underlying disk.
func (s *storageRESTServer) checkID(wantID string) bool {
if s.getStorage() == nil {
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return false
}
if wantID == "" {
// Request sent empty disk-id, we allow the request
// as the peer might be coming up and trying to read format.json
// or create format.json
return true
}
storedDiskID, err := s.getStorage().GetDiskID()
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
if err != nil {
return false
}
return wantID == storedDiskID
}
// HealthHandler handler checks if disk is stale
func (s *storageRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {
s.IsValid(w, r)
}
// DiskInfoHandler - returns disk info.
func (s *storageRESTServer) DiskInfoHandler(opts *DiskInfoOptions) (*DiskInfo, *grid.RemoteErr) {
if !s.checkID(opts.DiskID) {
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return nil, grid.NewRemoteErr(errDiskNotFound)
}
info, err := s.getStorage().DiskInfo(context.Background(), *opts)
if err != nil {
info.Error = err.Error()
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return &info, nil
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
func (s *storageRESTServer) NSScannerHandler(ctx context.Context, params *nsScannerOptions, out chan<- *nsScannerResp) *grid.RemoteErr {
if !s.checkID(params.DiskID) {
return grid.NewRemoteErr(errDiskNotFound)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
if params.Cache == nil {
return grid.NewRemoteErrString("NSScannerHandler: provided cache is nil")
}
// Collect updates, stream them before the full cache is sent.
updates := make(chan dataUsageEntry, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for update := range updates {
resp := storageNSScannerRPC.NewResponse()
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
resp.Update = &update
out <- resp
}
}()
ui, err := s.getStorage().NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode), nil)
wg.Wait()
if err != nil {
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return grid.NewRemoteErr(err)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// Send final response.
resp := storageNSScannerRPC.NewResponse()
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
resp.Final = &ui
out <- resp
return nil
}
// MakeVolHandler - make a volume.
func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
err := s.getStorage().MakeVol(r.Context(), volume)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// MakeVolBulkHandler - create multiple volumes as a bulk operation.
func (s *storageRESTServer) MakeVolBulkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volumes := strings.Split(r.Form.Get(storageRESTVolumes), ",")
err := s.getStorage().MakeVolBulk(r.Context(), volumes...)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// StatVolHandler - stat a volume.
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
func (s *storageRESTServer) StatVolHandler(params *grid.MSS) (*VolInfo, *grid.RemoteErr) {
if !s.checkID(params.Get(storageRESTDiskID)) {
return nil, grid.NewRemoteErr(errDiskNotFound)
}
info, err := s.getStorage().StatVol(context.Background(), params.Get(storageRESTVolume))
if err != nil {
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return nil, grid.NewRemoteErr(err)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return &info, nil
}
// AppendFileHandler - append data from the request to the file specified.
func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
buf := make([]byte, r.ContentLength)
_, err := io.ReadFull(r.Body, buf)
if err != nil {
s.writeErrorResponse(w, err)
return
}
err = s.getStorage().AppendFile(r.Context(), volume, filePath, buf)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// CreateFileHandler - copy the contents from the request.
func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
origvolume := r.Form.Get(storageRESTOrigVolume)
fileSizeStr := r.Form.Get(storageRESTLength)
fileSize, err := strconv.Atoi(fileSizeStr)
if err != nil {
s.writeErrorResponse(w, err)
return
}
done, body := keepHTTPReqResponseAlive(w, r)
done(s.getStorage().CreateFile(r.Context(), origvolume, volume, filePath, int64(fileSize), body))
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// DeleteVersionHandler delete updated metadata.
func (s *storageRESTServer) DeleteVersionHandler(p *DeleteVersionHandlerParams) (np grid.NoPayload, gerr *grid.RemoteErr) {
if !s.checkID(p.DiskID) {
return np, grid.NewRemoteErr(errDiskNotFound)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
volume := p.Volume
filePath := p.FilePath
forceDelMarker := p.ForceDelMarker
opts := DeleteOptions{}
err := s.getStorage().DeleteVersion(context.Background(), volume, filePath, p.FI, forceDelMarker, opts)
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return np, grid.NewRemoteErr(err)
}
// ReadVersionHandlerWS read metadata of versionID
func (s *storageRESTServer) ReadVersionHandlerWS(params *grid.MSS) (*FileInfo, *grid.RemoteErr) {
if !s.checkID(params.Get(storageRESTDiskID)) {
return nil, grid.NewRemoteErr(errDiskNotFound)
}
origvolume := params.Get(storageRESTOrigVolume)
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
volume := params.Get(storageRESTVolume)
filePath := params.Get(storageRESTFilePath)
versionID := params.Get(storageRESTVersionID)
healing, err := strconv.ParseBool(params.Get(storageRESTHealing))
if err != nil {
return nil, grid.NewRemoteErr(err)
}
inclFreeVersions, err := strconv.ParseBool(params.Get(storageRESTInclFreeVersions))
if err != nil {
return nil, grid.NewRemoteErr(err)
}
fi, err := s.getStorage().ReadVersion(context.Background(), origvolume, volume, filePath, versionID, ReadOptions{
InclFreeVersions: inclFreeVersions,
ReadData: false,
Healing: healing,
})
if err != nil {
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return nil, grid.NewRemoteErr(err)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return &fi, nil
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// ReadVersionHandler read metadata of versionID
func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
origvolume := r.Form.Get(storageRESTOrigVolume)
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
versionID := r.Form.Get(storageRESTVersionID)
healing, err := strconv.ParseBool(r.Form.Get(storageRESTHealing))
if err != nil {
s.writeErrorResponse(w, err)
return
}
inclFreeVersions, err := strconv.ParseBool(r.Form.Get(storageRESTInclFreeVersions))
if err != nil {
s.writeErrorResponse(w, err)
return
}
fi, err := s.getStorage().ReadVersion(r.Context(), origvolume, volume, filePath, versionID, ReadOptions{
InclFreeVersions: inclFreeVersions,
ReadData: true,
Healing: healing,
})
if err != nil {
s.writeErrorResponse(w, err)
return
}
storageLogIf(r.Context(), msgp.Encode(w, &fi))
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// WriteMetadataHandler rpc handler to write new updated metadata.
func (s *storageRESTServer) WriteMetadataHandler(p *MetadataHandlerParams) (np grid.NoPayload, gerr *grid.RemoteErr) {
if !s.checkID(p.DiskID) {
return grid.NewNPErr(errDiskNotFound)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
volume := p.Volume
filePath := p.FilePath
origvolume := p.OrigVolume
err := s.getStorage().WriteMetadata(context.Background(), origvolume, volume, filePath, p.FI)
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
return np, grid.NewRemoteErr(err)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// UpdateMetadataHandler update new updated metadata.
func (s *storageRESTServer) UpdateMetadataHandler(p *MetadataHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
if !s.checkID(p.DiskID) {
return grid.NewNPErr(errDiskNotFound)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
volume := p.Volume
filePath := p.FilePath
return grid.NewNPErr(s.getStorage().UpdateMetadata(context.Background(), volume, filePath, p.FI, p.UpdateOpts))
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// CheckPartsHandler - check if a file metadata exists.
func (s *storageRESTServer) CheckPartsHandler(p *CheckPartsHandlerParams) (*CheckPartsResp, *grid.RemoteErr) {
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
if !s.checkID(p.DiskID) {
return nil, grid.NewRemoteErr(errDiskNotFound)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
volume := p.Volume
filePath := p.FilePath
resp, err := s.getStorage().CheckParts(context.Background(), volume, filePath, p.FI)
return resp, grid.NewRemoteErr(err)
}
func (s *storageRESTServer) WriteAllHandler(p *WriteAllHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
if !s.checkID(p.DiskID) {
return grid.NewNPErr(errDiskNotFound)
}
volume := p.Volume
filePath := p.FilePath
return grid.NewNPErr(s.getStorage().WriteAll(context.Background(), volume, filePath, p.Buf))
}
// ReadAllHandler - read all the contents of a file.
func (s *storageRESTServer) ReadAllHandler(p *ReadAllHandlerParams) (*grid.Bytes, *grid.RemoteErr) {
if !s.checkID(p.DiskID) {
return nil, grid.NewRemoteErr(errDiskNotFound)
}
volume := p.Volume
filePath := p.FilePath
buf, err := s.getStorage().ReadAll(context.Background(), volume, filePath)
return grid.NewBytesWith(buf), grid.NewRemoteErr(err)
}
// ReadXLHandler - read xl.meta for an object at path.
func (s *storageRESTServer) ReadXLHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
rf, err := s.getStorage().ReadXL(r.Context(), volume, filePath, true)
if err != nil {
s.writeErrorResponse(w, err)
return
}
storageLogIf(r.Context(), msgp.Encode(w, &rf))
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// ReadXLHandlerWS - read xl.meta for an object at path.
func (s *storageRESTServer) ReadXLHandlerWS(params *grid.MSS) (*RawFileInfo, *grid.RemoteErr) {
if !s.checkID(params.Get(storageRESTDiskID)) {
return nil, grid.NewRemoteErr(errDiskNotFound)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
volume := params.Get(storageRESTVolume)
filePath := params.Get(storageRESTFilePath)
rf, err := s.getStorage().ReadXL(context.Background(), volume, filePath, false)
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
if err != nil {
return nil, grid.NewRemoteErr(err)
}
return &rf, nil
}
// ReadPartsHandler - read section of a file.
func (s *storageRESTServer) ReadPartsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
var preq ReadPartsReq
if err := msgp.Decode(r.Body, &preq); err != nil {
s.writeErrorResponse(w, err)
return
}
done := keepHTTPResponseAlive(w)
infos, err := s.getStorage().ReadParts(r.Context(), volume, preq.Paths...)
done(nil)
if err != nil {
s.writeErrorResponse(w, err)
return
}
presp := &ReadPartsResp{Infos: infos}
storageLogIf(r.Context(), msgp.Encode(w, presp))
}
// ReadFileHandler - read section of a file.
func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
offset, err := strconv.Atoi(r.Form.Get(storageRESTOffset))
if err != nil {
s.writeErrorResponse(w, err)
return
}
length, err := strconv.Atoi(r.Form.Get(storageRESTLength))
if err != nil {
s.writeErrorResponse(w, err)
return
}
if offset < 0 || length < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
var verifier *BitrotVerifier
if r.Form.Get(storageRESTBitrotAlgo) != "" {
hashStr := r.Form.Get(storageRESTBitrotHash)
var hash []byte
hash, err = hex.DecodeString(hashStr)
if err != nil {
s.writeErrorResponse(w, err)
return
}
verifier = NewBitrotVerifier(BitrotAlgorithmFromString(r.Form.Get(storageRESTBitrotAlgo)), hash)
}
buf := make([]byte, length)
defer metaDataPoolPut(buf) // Reuse if we can.
_, err = s.getStorage().ReadFile(r.Context(), volume, filePath, int64(offset), buf, verifier)
if err != nil {
s.writeErrorResponse(w, err)
return
}
w.Header().Set(xhttp.ContentLength, strconv.Itoa(len(buf)))
w.Write(buf)
}
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// ReadFileStreamHandler - read section of a file.
func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
offset, err := strconv.ParseInt(r.Form.Get(storageRESTOffset), 10, 64)
if err != nil {
s.writeErrorResponse(w, err)
return
}
length, err := strconv.ParseInt(r.Form.Get(storageRESTLength), 10, 64)
if err != nil {
s.writeErrorResponse(w, err)
return
}
rc, err := s.getStorage().ReadFileStream(r.Context(), volume, filePath, offset, length)
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer rc.Close()
_, err = xioutil.Copy(w, rc)
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
storageLogIf(r.Context(), err)
}
}
// ListDirHandler - list a directory.
func (s *storageRESTServer) ListDirHandler(ctx context.Context, params *grid.MSS, out chan<- *ListDirResult) *grid.RemoteErr {
if !s.checkID(params.Get(storageRESTDiskID)) {
return grid.NewRemoteErr(errDiskNotFound)
}
volume := params.Get(storageRESTVolume)
dirPath := params.Get(storageRESTDirPath)
origvolume := params.Get(storageRESTOrigVolume)
count, err := strconv.Atoi(params.Get(storageRESTCount))
if err != nil {
return grid.NewRemoteErr(err)
}
entries, err := s.getStorage().ListDir(ctx, origvolume, volume, dirPath, count)
if err != nil {
return grid.NewRemoteErr(err)
}
out <- &ListDirResult{Entries: entries}
return nil
}
// DeleteFileHandler - delete a file.
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
func (s *storageRESTServer) DeleteFileHandler(p *DeleteFileHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
if !s.checkID(p.DiskID) {
return grid.NewNPErr(errDiskNotFound)
}
return grid.NewNPErr(s.getStorage().Delete(context.Background(), p.Volume, p.FilePath, p.Opts))
}
// DeleteVersionsHandler - delete a set of a versions.
func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
totalVersions, err := strconv.Atoi(r.Form.Get(storageRESTTotalVersions))
if err != nil {
s.writeErrorResponse(w, err)
return
}
versions := make([]FileInfoVersions, totalVersions)
decoder := msgpNewReader(r.Body)
defer readMsgpReaderPoolPut(decoder)
for i := 0; i < totalVersions; i++ {
dst := &versions[i]
if err := dst.DecodeMsg(decoder); err != nil {
s.writeErrorResponse(w, err)
return
}
}
done := keepHTTPResponseAlive(w)
opts := DeleteOptions{}
errs := s.getStorage().DeleteVersions(r.Context(), volume, versions, opts)
done(nil)
dErrsResp := &DeleteVersionsErrsResp{Errs: make([]string, totalVersions)}
for idx := range versions {
if errs[idx] != nil {
dErrsResp.Errs[idx] = errs[idx].Error()
}
}
buf, _ := dErrsResp.MarshalMsg(nil)
w.Write(buf)
}
// RenameDataHandler - renames a meta object and data dir to destination.
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*RenameDataResp, *grid.RemoteErr) {
if !s.checkID(p.DiskID) {
return nil, grid.NewRemoteErr(errDiskNotFound)
}
resp, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath, p.Opts)
return &resp, grid.NewRemoteErr(err)
}
// RenameDataInlineHandler - renames a meta object and data dir to destination.
func (s *storageRESTServer) RenameDataInlineHandler(p *RenameDataInlineHandlerParams) (*RenameDataResp, *grid.RemoteErr) {
defer p.Recycle()
return s.RenameDataHandler(&p.RenameDataHandlerParams)
}
// RenameFileHandler - rename a file from source to destination
func (s *storageRESTServer) RenameFileHandler(p *RenameFileHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
if !s.checkID(p.DiskID) {
return grid.NewNPErr(errDiskNotFound)
}
return grid.NewNPErr(s.getStorage().RenameFile(context.Background(), p.SrcVolume, p.SrcFilePath, p.DstVolume, p.DstFilePath))
}
// RenamePartHandler - rename a multipart part from source to destination
func (s *storageRESTServer) RenamePartHandler(p *RenamePartHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
if !s.checkID(p.DiskID) {
return grid.NewNPErr(errDiskNotFound)
}
return grid.NewNPErr(s.getStorage().RenamePart(context.Background(), p.SrcVolume, p.SrcFilePath, p.DstVolume, p.DstFilePath, p.Meta))
}
// CleanAbandonedDataHandler - Clean unused data directories.
func (s *storageRESTServer) CleanAbandonedDataHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
if volume == "" || filePath == "" {
return // Ignore
}
keepHTTPResponseAlive(w)(s.getStorage().CleanAbandonedData(r.Context(), volume, filePath))
}
// closeNotifier is itself a ReadCloser that will notify when either an error occurs or
// the Close() function is called.
type closeNotifier struct {
rc io.ReadCloser
done chan struct{}
}
func (c *closeNotifier) Read(p []byte) (n int, err error) {
n, err = c.rc.Read(p)
if err != nil {
if c.done != nil {
xioutil.SafeClose(c.done)
c.done = nil
}
}
return n, err
}
func (c *closeNotifier) Close() error {
if c.done != nil {
xioutil.SafeClose(c.done)
c.done = nil
}
return c.rc.Close()
}
// keepHTTPReqResponseAlive can be used to avoid timeouts with long storage
// operations, such as bitrot verification or data usage scanning.
// Every 10 seconds a space character is sent.
// keepHTTPReqResponseAlive will wait for the returned body to be read before starting the ticker.
// The returned function should always be called to release resources.
// An optional error can be sent which will be picked as text only error,
// without its original type by the receiver.
// waitForHTTPResponse should be used to the receiving side.
func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func(error), body io.ReadCloser) {
bodyDoneCh := make(chan struct{})
doneCh := make(chan error)
ctx := r.Context()
go func() {
canWrite := true
write := func(b []byte) {
if canWrite {
n, err := w.Write(b)
if err != nil || n != len(b) {
canWrite = false
}
}
}
// Wait for body to be read.
select {
case <-ctx.Done():
case <-bodyDoneCh:
case err := <-doneCh:
if err != nil {
write([]byte{1})
write([]byte(err.Error()))
} else {
write([]byte{0})
}
xioutil.SafeClose(doneCh)
return
}
defer xioutil.SafeClose(doneCh)
// Initiate ticker after body has been read.
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// The done() might have been called
// concurrently, check for it before we
// write the filler byte.
select {
case err := <-doneCh:
if err != nil {
write([]byte{1})
write([]byte(err.Error()))
} else {
write([]byte{0})
}
return
default:
}
// Response not ready, write a filler byte.
write([]byte{32})
if canWrite {
w.(http.Flusher).Flush()
}
case err := <-doneCh:
if err != nil {
write([]byte{1})
write([]byte(err.Error()))
} else {
write([]byte{0})
}
return
}
}
}()
return func(err error) {
if doneCh == nil {
return
}
// Indicate we are ready to write.
doneCh <- err
// Wait for channel to be closed so we don't race on writes.
<-doneCh
// Clear so we can be called multiple times without crashing.
doneCh = nil
}, &closeNotifier{rc: r.Body, done: bodyDoneCh}
}
// keepHTTPResponseAlive can be used to avoid timeouts with long storage
// operations, such as bitrot verification or data usage scanning.
// keepHTTPResponseAlive may NOT be used until the request body has been read,
// use keepHTTPReqResponseAlive instead.
// Every 10 seconds a space character is sent.
// The returned function should always be called to release resources.
// An optional error can be sent which will be picked as text only error,
// without its original type by the receiver.
// waitForHTTPResponse should be used to the receiving side.
func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
doneCh := make(chan error)
go func() {
canWrite := true
write := func(b []byte) {
if canWrite {
n, err := w.Write(b)
if err != nil || n != len(b) {
canWrite = false
}
}
}
defer xioutil.SafeClose(doneCh)
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// The done() might have been called
// concurrently, check for it before we
// write the filler byte.
select {
case err := <-doneCh:
if err != nil {
write([]byte{1})
write([]byte(err.Error()))
} else {
write([]byte{0})
}
return
default:
}
// Response not ready, write a filler byte.
write([]byte{32})
if canWrite {
w.(http.Flusher).Flush()
}
case err := <-doneCh:
if err != nil {
write([]byte{1})
write([]byte(err.Error()))
} else {
write([]byte{0})
}
return
}
}
}()
return func(err error) {
if doneCh == nil {
return
}
// Indicate we are ready to write.
doneCh <- err
// Wait for channel to be closed so we don't race on writes.
<-doneCh
// Clear so we can be called multiple times without crashing.
doneCh = nil
}
}
// waitForHTTPResponse will wait for responses where keepHTTPResponseAlive
// has been used.
// The returned reader contains the payload.
func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
reader := bufio.NewReader(respBody)
for {
b, err := reader.ReadByte()
if err != nil {
return nil, err
}
// Check if we have a response ready or a filler byte.
switch b {
case 0:
return reader, nil
case 1:
2022-09-19 14:05:16 -04:00
errorText, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
return nil, errors.New(string(errorText))
case 32:
continue
default:
return nil, fmt.Errorf("unexpected filler byte: %d", b)
}
}
}
// httpStreamResponse allows streaming a response, but still send an error.
type httpStreamResponse struct {
done chan error
block chan []byte
err error
}
// Write part of the streaming response.
// Note that upstream errors are currently not forwarded, but may be in the future.
func (h *httpStreamResponse) Write(b []byte) (int, error) {
if len(b) == 0 || h.err != nil {
// Ignore 0 length blocks
return 0, h.err
}
tmp := make([]byte, len(b))
copy(tmp, b)
h.block <- tmp
return len(b), h.err
}
// CloseWithError will close the stream and return the specified error.
// This can be done several times, but only the first error will be sent.
// After calling this the stream should not be written to.
func (h *httpStreamResponse) CloseWithError(err error) {
if h.done == nil {
return
}
h.done <- err
h.err = err
// Indicates that the response is done.
<-h.done
h.done = nil
}
// streamHTTPResponse can be used to avoid timeouts with long storage
// operations, such as bitrot verification or data usage scanning.
// Every 10 seconds a space character is sent.
// The returned function should always be called to release resources.
// An optional error can be sent which will be picked as text only error,
// without its original type by the receiver.
// waitForHTTPStream should be used to the receiving side.
func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse {
doneCh := make(chan error)
blockCh := make(chan []byte)
h := httpStreamResponse{done: doneCh, block: blockCh}
go func() {
canWrite := true
write := func(b []byte) {
if canWrite {
n, err := w.Write(b)
if err != nil || n != len(b) {
canWrite = false
}
}
}
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Response not ready, write a filler byte.
write([]byte{32})
if canWrite {
w.(http.Flusher).Flush()
}
case err := <-doneCh:
if err != nil {
write([]byte{1})
write([]byte(err.Error()))
} else {
write([]byte{0})
}
xioutil.SafeClose(doneCh)
return
case block := <-blockCh:
var tmp [5]byte
tmp[0] = 2
binary.LittleEndian.PutUint32(tmp[1:], uint32(len(block)))
write(tmp[:])
write(block)
if canWrite {
w.(http.Flusher).Flush()
}
}
}
}()
return &h
}
var poolBuf8k = sync.Pool{
New: func() interface{} {
b := make([]byte, 8192)
return &b
},
}
var poolBuf128k = sync.Pool{
New: func() interface{} {
b := make([]byte, 128<<10)
return b
},
}
// waitForHTTPStream will wait for responses where
// streamHTTPResponse has been used.
// The returned reader contains the payload and must be closed if no error is returned.
func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
var tmp [1]byte
// 8K copy buffer, reused for less allocs...
bufp := poolBuf8k.Get().(*[]byte)
buf := *bufp
defer poolBuf8k.Put(bufp)
for {
_, err := io.ReadFull(respBody, tmp[:])
if err != nil {
return err
}
// Check if we have a response ready or a filler byte.
switch tmp[0] {
case 0:
// 0 is unbuffered, copy the rest.
_, err := io.CopyBuffer(w, respBody, buf)
if err == io.EOF {
return nil
}
return err
case 1:
2022-09-19 14:05:16 -04:00
errorText, err := io.ReadAll(respBody)
if err != nil {
return err
}
return errors.New(string(errorText))
case 2:
// Block of data
var tmp [4]byte
_, err := io.ReadFull(respBody, tmp[:])
if err != nil {
return err
}
length := binary.LittleEndian.Uint32(tmp[:])
n, err := io.CopyBuffer(w, io.LimitReader(respBody, int64(length)), buf)
if err != nil {
return err
}
if n != int64(length) {
return io.ErrUnexpectedEOF
}
continue
case 32:
continue
default:
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
}
}
}
// VerifyFileHandler - Verify all part of file for bitrot errors.
func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
var fi FileInfo
if err := msgp.Decode(r.Body, &fi); err != nil {
s.writeErrorResponse(w, err)
return
}
done := keepHTTPResponseAlive(w)
resp, err := s.getStorage().VerifyFile(r.Context(), volume, filePath, fi)
done(err)
if err != nil {
return
}
buf, _ := resp.MarshalMsg(nil)
w.Write(buf)
}
func checkDiskFatalErrs(errs []error) error {
// This returns a common error if all errors are
// same errors, then there is no point starting
// the server.
if countErrs(errs, errUnsupportedDisk) == len(errs) {
return errUnsupportedDisk
}
if countErrs(errs, errDiskAccessDenied) == len(errs) {
return errDiskAccessDenied
}
if countErrs(errs, errFileAccessDenied) == len(errs) {
return errDiskAccessDenied
}
if countErrs(errs, errDiskNotDir) == len(errs) {
return errDiskNotDir
}
if countErrs(errs, errFaultyDisk) == len(errs) {
return errFaultyDisk
}
if countErrs(errs, errXLBackend) == len(errs) {
return errXLBackend
}
return nil
}
// A single function to write certain errors to be fatal
// or informative based on the `exit` flag, please look
// at each implementation of error for added hints.
//
// FIXME: This is an unusual function but serves its purpose for
// now, need to revisit the overall erroring structure here.
// Do not like it :-(
func logFatalErrs(err error, endpoint Endpoint, exit bool) {
switch {
case errors.Is(err, errXLBackend):
logger.Fatal(config.ErrInvalidXLValue(err), "Unable to initialize backend")
case errors.Is(err, errUnsupportedDisk):
var hint string
if endpoint.URL != nil {
hint = fmt.Sprintf("Drive '%s' does not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support", endpoint.Path)
} else {
hint = "Drives do not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support"
}
logger.Fatal(config.ErrUnsupportedBackend(err).Hint("%s", hint), "Unable to initialize backend")
case errors.Is(err, errDiskNotDir):
var hint string
if endpoint.URL != nil {
hint = fmt.Sprintf("Drive '%s' is not a directory, MinIO erasure coding needs a directory", endpoint.Path)
} else {
hint = "Drives are not directories, MinIO erasure coding needs directories"
}
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint("%s", hint), "Unable to initialize backend")
case errors.Is(err, errDiskAccessDenied):
// Show a descriptive error with a hint about how to fix it.
var username string
if u, err := user.Current(); err == nil {
username = u.Username
} else {
username = "<your-username>"
}
var hint string
if endpoint.URL != nil {
hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s %s && sudo chmod u+rxw %s`",
username, endpoint.Path, endpoint.Path)
} else {
hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s. <path> && sudo chmod u+rxw <path>`", username)
}
if !exit {
storageLogOnceIf(GlobalContext, fmt.Errorf("Drive is not writable %s, %s", endpoint, hint), "log-fatal-errs")
} else {
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint("%s", hint), "Unable to initialize backend")
}
case errors.Is(err, errFaultyDisk):
if !exit {
storageLogOnceIf(GlobalContext, fmt.Errorf("Drive is faulty at %s, please replace the drive - drive will be offline", endpoint), "log-fatal-errs")
} else {
logger.Fatal(err, "Unable to initialize backend")
}
case errors.Is(err, errDiskFull):
if !exit {
storageLogOnceIf(GlobalContext, fmt.Errorf("Drive is already full at %s, incoming I/O will fail - drive will be offline", endpoint), "log-fatal-errs")
} else {
logger.Fatal(err, "Unable to initialize backend")
}
case errors.Is(err, errInconsistentDisk):
if exit {
logger.Fatal(err, "Unable to initialize backend")
}
default:
if !exit {
storageLogOnceIf(GlobalContext, fmt.Errorf("Drive %s returned an unexpected error: %w, please investigate - drive will be offline", endpoint, err), "log-fatal-errs")
} else {
logger.Fatal(err, "Unable to initialize backend")
}
}
}
// StatInfoFile returns file stat info.
func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
glob := r.Form.Get(storageRESTGlob)
done := keepHTTPResponseAlive(w)
stats, err := s.getStorage().StatInfoFile(r.Context(), volume, filePath, glob == "true")
done(err)
if err != nil {
return
}
Add admin inspect Glob support (#13328) * Add admin Glob support Allow returning multiple files on inspect calls. ``` λ mc admin inspect --json local2/testbucket/nyc-taxi-data-10M.csv.zst/* ... λ unzip -l inspect.5f0643b2.zip Archive: inspect.5f0643b2.zip Length Date Time Name --------- ---------- ----- ---- 0 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/ 802 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/xl.meta 0 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/ 802 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/xl.meta 0 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/ 802 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/xl.meta 0 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/ 802 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/xl.meta --------- ------- 3208 8 files ``` Using fully recursive: ``` λ mc admin inspect local2/testbucket/nyc-taxi-data-10M.csv.zst/** ... Archive: inspect.79c261cb.zip Length Date Time Name --------- ---------- ----- ---- 0 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/ 0 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/ 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.1 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.10 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.11 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.12 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.13 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.14 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.15 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.16 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.17 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.18 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.19 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.2 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.20 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.21 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.22 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.23 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.24 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.25 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.26 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.27 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.28 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.29 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.3 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.30 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.31 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.32 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.33 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.34 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.35 3439368 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.36 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.4 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.5 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.6 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.7 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.8 4194816 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.9 802 2021-09-03 12:50 192.168.1.78:9001/a221edde-48fe-45f5-ad32-3bc7131c7659/testbucket/nyc-taxi-data-10M.csv.zst/xl.meta 0 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/ 0 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/ 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.1 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.10 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.11 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.12 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.13 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.14 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.15 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.16 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.17 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.18 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.19 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.2 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.20 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.21 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.22 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.23 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.24 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.25 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.26 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.27 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.28 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.29 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.3 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.30 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.31 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.32 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.33 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.34 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.35 3439368 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.36 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.4 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.5 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.6 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.7 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.8 4194816 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.9 802 2021-09-03 12:50 192.168.1.78:9001/cb7440ef-f0d9-42a8-b137-f00f519276ca/testbucket/nyc-taxi-data-10M.csv.zst/xl.meta 0 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/ 0 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/ 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.1 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.10 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.11 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.12 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.13 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.14 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.15 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.16 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.17 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.18 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.19 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.2 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.20 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.21 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.22 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.23 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.24 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.25 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.26 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.27 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.28 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.29 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.3 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.30 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.31 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.32 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.33 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.34 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.35 3439368 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.36 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.4 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.5 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.6 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.7 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.8 4194816 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.9 802 2021-09-03 12:50 192.168.1.78:9001/759cd5ac-7860-4cf3-acad-a375fcbae338/testbucket/nyc-taxi-data-10M.csv.zst/xl.meta 0 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/ 0 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/ 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.1 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.10 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.11 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.12 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.13 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.14 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.15 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.16 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.17 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.18 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.19 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.2 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.20 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.21 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.22 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.23 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.24 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.25 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.26 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.27 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.28 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.29 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.3 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.30 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.31 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.32 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.33 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.34 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.35 3439368 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.36 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.4 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.5 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.6 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.7 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.8 4194816 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/18a50b3e-3c56-418e-a045-ad5c58c1d44b/part.9 802 2021-09-09 15:56 192.168.1.78:9001/2b48619c-c2fa-4e69-839e-58fc82c1b43e/testbucket/nyc-taxi-data-10M.csv.zst/xl.meta --------- ------- 601034920 156 files ``` Furthermore allow `inspect` to do direct decode from `mc`, for example: ``` λ mc admin inspect --json local2/testbucket/nyc-taxi-data-10M.csv.zst/*|inspect -json Output decrypted to inspect.5f0643b2.zip ``` - Correct error, forward non-EOF errors. - Add some extra safety. Log FNF when no files. - Add `xl-meta` zip support. For `xl-meta` multiple inputs output object with names as key. Automatically switches `xl-meta` to single-line output when multiple objects. Add double-star wildcard support to xl-meta input. Co-authored-by: Harshavardhana <harsha@minio.io>
2021-10-01 14:50:00 -04:00
for _, si := range stats {
msgp.Encode(w, &si)
}
}
func (s *storageRESTServer) DeleteBulkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
var req DeleteBulkReq
mr := msgpNewReader(r.Body)
defer readMsgpReaderPoolPut(mr)
if err := req.DecodeMsg(mr); err != nil {
s.writeErrorResponse(w, err)
return
}
volume := r.Form.Get(storageRESTVolume)
keepHTTPResponseAlive(w)(s.getStorage().DeleteBulk(r.Context(), volume, req.Paths...))
}
// ReadMultiple returns multiple files
func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
rw := streamHTTPResponse(w)
defer func() {
if r := recover(); r != nil {
debug.PrintStack()
rw.CloseWithError(fmt.Errorf("panic: %v", r))
}
}()
var req ReadMultipleReq
mr := msgpNewReader(r.Body)
defer readMsgpReaderPoolPut(mr)
err := req.DecodeMsg(mr)
if err != nil {
rw.CloseWithError(err)
return
}
mw := msgp.NewWriter(rw)
responses := make(chan ReadMultipleResp, len(req.Files))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for resp := range responses {
err := resp.EncodeMsg(mw)
if err != nil {
rw.CloseWithError(err)
return
}
mw.Flush()
}
}()
err = s.getStorage().ReadMultiple(r.Context(), req, responses)
wg.Wait()
rw.CloseWithError(err)
}
// globalLocalSetDrives is used for local drive as well as remote REST
// API caller for other nodes to talk to this node.
//
// Any updates to this must be serialized via globalLocalDrivesMu (locker)
var globalLocalSetDrives [][][]StorageAPI
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// registerStorageRESTHandlers - register storage rpc router.
func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools, gm *grid.Manager) {
h := func(f http.HandlerFunc) http.HandlerFunc {
return collectInternodeStats(httpTraceHdrs(f))
}
globalLocalDrivesMap = make(map[string]StorageAPI)
globalLocalSetDrives = make([][][]StorageAPI, len(endpointServerPools))
for pool := range globalLocalSetDrives {
globalLocalSetDrives[pool] = make([][]StorageAPI, endpointServerPools[pool].SetCount)
for set := range globalLocalSetDrives[pool] {
globalLocalSetDrives[pool][set] = make([]StorageAPI, endpointServerPools[pool].DrivesPerSet)
}
}
for _, serverPool := range endpointServerPools {
for _, endpoint := range serverPool.Endpoints {
if !endpoint.IsLocal {
continue
2019-11-19 20:42:27 -05:00
}
server := &storageRESTServer{
endpoint: endpoint,
}
2019-11-19 20:42:27 -05:00
subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter()
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodHealth).HandlerFunc(h(server.HealthHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodAppendFile).HandlerFunc(h(server.AppendFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCreateFile).HandlerFunc(h(server.CreateFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(h(server.DeleteVersionsHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(h(server.VerifyFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(h(server.StatInfoFile))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteBulk).HandlerFunc(h(server.DeleteBulkHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadParts).HandlerFunc(h(server.ReadPartsHandler))
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler))
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(h(server.ReadVersionHandler))
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadXL).HandlerFunc(h(server.ReadXLHandler))
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadFile).HandlerFunc(h(server.ReadFileHandler))
logger.FatalIf(storageListDirRPC.RegisterNoInput(gm, server.ListDirHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageReadAllRPC.Register(gm, server.ReadAllHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageWriteAllRPC.Register(gm, server.WriteAllHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageRenameFileRPC.Register(gm, server.RenameFileHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageRenamePartRPC.Register(gm, server.RenamePartHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageRenameDataRPC.Register(gm, server.RenameDataHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageRenameDataInlineRPC.Register(gm, server.RenameDataInlineHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageDeleteFileRPC.Register(gm, server.DeleteFileHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageCheckPartsRPC.Register(gm, server.CheckPartsHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageReadVersionRPC.Register(gm, server.ReadVersionHandlerWS, endpoint.Path), "unable to register handler")
logger.FatalIf(storageWriteMetadataRPC.Register(gm, server.WriteMetadataHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageUpdateMetadataRPC.Register(gm, server.UpdateMetadataHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageDeleteVersionRPC.Register(gm, server.DeleteVersionHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageReadXLRPC.Register(gm, server.ReadXLHandlerWS, endpoint.Path), "unable to register handler")
logger.FatalIf(storageNSScannerRPC.RegisterNoInput(gm, server.NSScannerHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageDiskInfoRPC.Register(gm, server.DiskInfoHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageStatVolRPC.Register(gm, server.StatVolHandler, endpoint.Path), "unable to register handler")
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
logger.FatalIf(gm.RegisterStreamingHandler(grid.HandlerWalkDir, grid.StreamHandler{
Subroute: endpoint.Path,
Handle: server.WalkDirHandler,
OutCapacity: 1,
}), "unable to register handler")
createStorage := func(endpoint Endpoint) bool {
xl, err := newXLStorage(endpoint, false)
if err != nil {
// if supported errors don't fail, we proceed to
// printing message and moving forward.
if errors.Is(err, errDriveIsRoot) {
err = fmt.Errorf("major: %v: minor: %v: %w", xl.major, xl.minor, err)
}
logFatalErrs(err, endpoint, false)
return false
}
storage := newXLStorageDiskIDCheck(xl, true)
storage.SetDiskID(xl.diskID)
// We do not have to do SetFormatData() since 'xl'
// already captures formatData cached.
globalLocalDrivesMu.Lock()
defer globalLocalDrivesMu.Unlock()
globalLocalDrivesMap[endpoint.String()] = storage
globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx] = storage
return true
}
if createStorage(endpoint) {
continue
}
// Start async goroutine to create storage.
go func(endpoint Endpoint) {
for {
time.Sleep(3 * time.Second)
if createStorage(endpoint) {
return
}
}
}(endpoint)
}
}
}