Reduce big message RPC allocations (#19390)

Use `ODirectPoolSmall` buffers for inline data in PutObject.

Add a separate call for inline data that will fetch a buffer for the inline data before unmarshal.
This commit is contained in:
Klaus Post 2024-04-01 16:42:09 -07:00 committed by GitHub
parent 06929258bc
commit b435806d91
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 366 additions and 65 deletions

View File

@ -19,6 +19,19 @@ extend-ignore-re = [
[default.extend-words] [default.extend-words]
"encrypter" = "encrypter" "encrypter" = "encrypter"
"requestor" = "requestor" "requestor" = "requestor"
"KMS" = "KMS"
"kms" = "kms"
"Kms" = "Kms"
"Dur" = "Dur"
"EOF" = "EOF"
"hd" = "hd"
"ws" = "ws"
"guid" = "guid"
"lst" = "lst"
"pn" = "pn"
"Iy" = "Iy"
"ro" = "ro"
"thr" = "thr"
[default.extend-identifiers] [default.extend-identifiers]
"bui" = "bui" "bui" = "bui"

View File

@ -29,6 +29,7 @@ import (
"time" "time"
"github.com/minio/madmin-go/v3" "github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/grid"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/sync/errgroup" "github.com/minio/pkg/v2/sync/errgroup"
) )
@ -539,7 +540,10 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
} }
partPath := pathJoin(tmpID, dstDataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(tmpID, dstDataDir, fmt.Sprintf("part.%d", partNumber))
if len(inlineBuffers) > 0 { if len(inlineBuffers) > 0 {
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(latestMeta.Size)+32)) buf := grid.GetByteBufferCap(int(erasure.ShardFileSize(latestMeta.Size)) + 64)
inlineBuffers[i] = bytes.NewBuffer(buf[:0])
defer grid.PutByteBuffer(buf)
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
} else { } else {
writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, partPath, writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, partPath,

View File

@ -41,6 +41,7 @@ import (
"github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/grid"
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
@ -1147,7 +1148,6 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
buffer = buffer[:fi.Erasure.BlockSize] buffer = buffer[:fi.Erasure.BlockSize]
} }
shardFileSize := erasure.ShardFileSize(data.Size())
writers := make([]io.Writer, len(onlineDisks)) writers := make([]io.Writer, len(onlineDisks))
inlineBuffers := make([]*bytes.Buffer, len(onlineDisks)) inlineBuffers := make([]*bytes.Buffer, len(onlineDisks))
for i, disk := range onlineDisks { for i, disk := range onlineDisks {
@ -1155,7 +1155,9 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
continue continue
} }
if disk.IsOnline() { if disk.IsOnline() {
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, shardFileSize)) buf := grid.GetByteBufferCap(int(erasure.ShardFileSize(data.Size())) + 64)
inlineBuffers[i] = bytes.NewBuffer(buf[:0])
defer grid.PutByteBuffer(buf)
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
} }
} }
@ -1435,11 +1437,9 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
} }
if len(inlineBuffers) > 0 { if len(inlineBuffers) > 0 {
sz := shardFileSize buf := grid.GetByteBufferCap(int(shardFileSize) + 64)
if sz < 0 { inlineBuffers[i] = bytes.NewBuffer(buf[:0])
sz = data.ActualSize() defer grid.PutByteBuffer(buf)
}
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, sz))
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
continue continue
} }
@ -1448,7 +1448,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
} }
toEncode := io.Reader(data) toEncode := io.Reader(data)
if data.Size() > bigFileThreshold { if data.Size() >= bigFileThreshold {
// We use 2 buffers, so we always have a full buffer of input. // We use 2 buffers, so we always have a full buffer of input.
bufA := globalBytePoolCap.Get() bufA := globalBytePoolCap.Get()
bufB := globalBytePoolCap.Get() bufB := globalBytePoolCap.Get()

View File

@ -21,6 +21,8 @@ import (
"time" "time"
"github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/grid"
xioutil "github.com/minio/minio/internal/ioutil"
) )
//go:generate msgp -file=$GOFILE //go:generate msgp -file=$GOFILE
@ -433,6 +435,27 @@ type RenameDataHandlerParams struct {
Opts RenameOptions `msg:"ro"` Opts RenameOptions `msg:"ro"`
} }
// RenameDataInlineHandlerParams are parameters for RenameDataHandler with a buffer for inline data.
type RenameDataInlineHandlerParams struct {
RenameDataHandlerParams `msg:"p"`
}
func newRenameDataInlineHandlerParams() *RenameDataInlineHandlerParams {
buf := grid.GetByteBufferCap(32 + 16<<10)
return &RenameDataInlineHandlerParams{RenameDataHandlerParams{FI: FileInfo{Data: buf[:0]}}}
}
// Recycle will reuse the memory allocated for the FileInfo data.
func (r *RenameDataInlineHandlerParams) Recycle() {
if r == nil {
return
}
if cap(r.FI.Data) >= xioutil.BlockSizeSmall {
grid.PutByteBuffer(r.FI.Data)
r.FI.Data = nil
}
}
// RenameFileHandlerParams are parameters for RenameFileHandler. // RenameFileHandlerParams are parameters for RenameFileHandler.
type RenameFileHandlerParams struct { type RenameFileHandlerParams struct {
DiskID string `msg:"id"` DiskID string `msg:"id"`

View File

@ -4626,6 +4626,113 @@ func (z *RenameDataHandlerParams) Msgsize() (s int) {
return return
} }
// DecodeMsg implements msgp.Decodable
func (z *RenameDataInlineHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "p":
err = z.RenameDataHandlerParams.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "RenameDataHandlerParams")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *RenameDataInlineHandlerParams) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "p"
err = en.Append(0x81, 0xa1, 0x70)
if err != nil {
return
}
err = z.RenameDataHandlerParams.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "RenameDataHandlerParams")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *RenameDataInlineHandlerParams) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "p"
o = append(o, 0x81, 0xa1, 0x70)
o, err = z.RenameDataHandlerParams.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "RenameDataHandlerParams")
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *RenameDataInlineHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "p":
bts, err = z.RenameDataHandlerParams.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "RenameDataHandlerParams")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *RenameDataInlineHandlerParams) Msgsize() (s int) {
s = 1 + 2 + z.RenameDataHandlerParams.Msgsize()
return
}
// DecodeMsg implements msgp.Decodable // DecodeMsg implements msgp.Decodable
func (z *RenameDataResp) DecodeMsg(dc *msgp.Reader) (err error) { func (z *RenameDataResp) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte var field []byte

View File

@ -2156,6 +2156,119 @@ func BenchmarkDecodeRenameDataHandlerParams(b *testing.B) {
} }
} }
func TestMarshalUnmarshalRenameDataInlineHandlerParams(t *testing.T) {
v := RenameDataInlineHandlerParams{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgRenameDataInlineHandlerParams(b *testing.B) {
v := RenameDataInlineHandlerParams{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgRenameDataInlineHandlerParams(b *testing.B) {
v := RenameDataInlineHandlerParams{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalRenameDataInlineHandlerParams(b *testing.B) {
v := RenameDataInlineHandlerParams{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeRenameDataInlineHandlerParams(t *testing.T) {
v := RenameDataInlineHandlerParams{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeRenameDataInlineHandlerParams Msgsize() is inaccurate")
}
vn := RenameDataInlineHandlerParams{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeRenameDataInlineHandlerParams(b *testing.B) {
v := RenameDataInlineHandlerParams{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeRenameDataInlineHandlerParams(b *testing.B) {
v := RenameDataInlineHandlerParams{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalRenameDataResp(t *testing.T) { func TestMarshalUnmarshalRenameDataResp(t *testing.T) {
v := RenameDataResp{} v := RenameDataResp{}
bts, err := v.MarshalMsg(nil) bts, err := v.MarshalMsg(nil)

View File

@ -468,7 +468,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
// RenameData - rename source path to destination path atomically, metadata and data file. // RenameData - rename source path to destination path atomically, metadata and data file.
func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) { func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) {
resp, err := storageRenameDataRPC.Call(ctx, client.gridConn, &RenameDataHandlerParams{ params := RenameDataHandlerParams{
DiskID: client.diskID, DiskID: client.diskID,
SrcVolume: srcVolume, SrcVolume: srcVolume,
SrcPath: srcPath, SrcPath: srcPath,
@ -476,10 +476,17 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
DstVolume: dstVolume, DstVolume: dstVolume,
FI: fi, FI: fi,
Opts: opts, Opts: opts,
}) }
var resp *RenameDataResp
if fi.Data == nil {
resp, err = storageRenameDataRPC.Call(ctx, client.gridConn, &params)
} else {
resp, err = storageRenameDataInlineRPC.Call(ctx, client.gridConn, &RenameDataInlineHandlerParams{params})
}
if err != nil { if err != nil {
return 0, toStorageErr(err) return 0, toStorageErr(err)
} }
defer storageRenameDataRPC.PutResponse(resp) defer storageRenameDataRPC.PutResponse(resp)
return resp.Signature, nil return resp.Signature, nil
} }

View File

@ -58,21 +58,22 @@ type storageRESTServer struct {
} }
var ( var (
storageCheckPartsRPC = grid.NewSingleHandler[*CheckPartsHandlerParams, grid.NoPayload](grid.HandlerCheckParts, func() *CheckPartsHandlerParams { return &CheckPartsHandlerParams{} }, grid.NewNoPayload) storageCheckPartsRPC = grid.NewSingleHandler[*CheckPartsHandlerParams, grid.NoPayload](grid.HandlerCheckParts, func() *CheckPartsHandlerParams { return &CheckPartsHandlerParams{} }, grid.NewNoPayload)
storageDeleteFileRPC = grid.NewSingleHandler[*DeleteFileHandlerParams, grid.NoPayload](grid.HandlerDeleteFile, func() *DeleteFileHandlerParams { return &DeleteFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true) storageDeleteFileRPC = grid.NewSingleHandler[*DeleteFileHandlerParams, grid.NoPayload](grid.HandlerDeleteFile, func() *DeleteFileHandlerParams { return &DeleteFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true)
storageDeleteVersionRPC = grid.NewSingleHandler[*DeleteVersionHandlerParams, grid.NoPayload](grid.HandlerDeleteVersion, func() *DeleteVersionHandlerParams { return &DeleteVersionHandlerParams{} }, grid.NewNoPayload) storageDeleteVersionRPC = grid.NewSingleHandler[*DeleteVersionHandlerParams, grid.NoPayload](grid.HandlerDeleteVersion, func() *DeleteVersionHandlerParams { return &DeleteVersionHandlerParams{} }, grid.NewNoPayload)
storageDiskInfoRPC = grid.NewSingleHandler[*DiskInfoOptions, *DiskInfo](grid.HandlerDiskInfo, func() *DiskInfoOptions { return &DiskInfoOptions{} }, func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse().AllowCallRequestPool(true) storageDiskInfoRPC = grid.NewSingleHandler[*DiskInfoOptions, *DiskInfo](grid.HandlerDiskInfo, func() *DiskInfoOptions { return &DiskInfoOptions{} }, func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse().AllowCallRequestPool(true)
storageNSScannerRPC = grid.NewStream[*nsScannerOptions, grid.NoPayload, *nsScannerResp](grid.HandlerNSScanner, func() *nsScannerOptions { return &nsScannerOptions{} }, nil, func() *nsScannerResp { return &nsScannerResp{} }) storageNSScannerRPC = grid.NewStream[*nsScannerOptions, grid.NoPayload, *nsScannerResp](grid.HandlerNSScanner, func() *nsScannerOptions { return &nsScannerOptions{} }, nil, func() *nsScannerResp { return &nsScannerResp{} })
storageReadAllRPC = grid.NewSingleHandler[*ReadAllHandlerParams, *grid.Bytes](grid.HandlerReadAll, func() *ReadAllHandlerParams { return &ReadAllHandlerParams{} }, grid.NewBytes).AllowCallRequestPool(true) storageReadAllRPC = grid.NewSingleHandler[*ReadAllHandlerParams, *grid.Bytes](grid.HandlerReadAll, func() *ReadAllHandlerParams { return &ReadAllHandlerParams{} }, grid.NewBytes).AllowCallRequestPool(true)
storageWriteAllRPC = grid.NewSingleHandler[*WriteAllHandlerParams, grid.NoPayload](grid.HandlerWriteAll, func() *WriteAllHandlerParams { return &WriteAllHandlerParams{} }, grid.NewNoPayload) storageWriteAllRPC = grid.NewSingleHandler[*WriteAllHandlerParams, grid.NoPayload](grid.HandlerWriteAll, func() *WriteAllHandlerParams { return &WriteAllHandlerParams{} }, grid.NewNoPayload)
storageReadVersionRPC = grid.NewSingleHandler[*grid.MSS, *FileInfo](grid.HandlerReadVersion, grid.NewMSS, func() *FileInfo { return &FileInfo{} }) storageReadVersionRPC = grid.NewSingleHandler[*grid.MSS, *FileInfo](grid.HandlerReadVersion, grid.NewMSS, func() *FileInfo { return &FileInfo{} })
storageReadXLRPC = grid.NewSingleHandler[*grid.MSS, *RawFileInfo](grid.HandlerReadXL, grid.NewMSS, func() *RawFileInfo { return &RawFileInfo{} }) storageReadXLRPC = grid.NewSingleHandler[*grid.MSS, *RawFileInfo](grid.HandlerReadXL, grid.NewMSS, func() *RawFileInfo { return &RawFileInfo{} })
storageRenameDataRPC = grid.NewSingleHandler[*RenameDataHandlerParams, *RenameDataResp](grid.HandlerRenameData, func() *RenameDataHandlerParams { return &RenameDataHandlerParams{} }, func() *RenameDataResp { return &RenameDataResp{} }) storageRenameDataRPC = grid.NewSingleHandler[*RenameDataHandlerParams, *RenameDataResp](grid.HandlerRenameData, func() *RenameDataHandlerParams { return &RenameDataHandlerParams{} }, func() *RenameDataResp { return &RenameDataResp{} })
storageRenameFileRPC = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true) storageRenameDataInlineRPC = grid.NewSingleHandler[*RenameDataInlineHandlerParams, *RenameDataResp](grid.HandlerRenameDataInline, newRenameDataInlineHandlerParams, func() *RenameDataResp { return &RenameDataResp{} }).AllowCallRequestPool(false)
storageStatVolRPC = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerStatVol, grid.NewMSS, func() *VolInfo { return &VolInfo{} }) storageRenameFileRPC = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true)
storageUpdateMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerUpdateMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload) storageStatVolRPC = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerStatVol, grid.NewMSS, func() *VolInfo { return &VolInfo{} })
storageWriteMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerWriteMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload) storageUpdateMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerUpdateMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload)
storageListDirRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *ListDirResult](grid.HandlerListDir, grid.NewMSS, nil, func() *ListDirResult { return &ListDirResult{} }).WithOutCapacity(1) storageWriteMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerWriteMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload)
storageListDirRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *ListDirResult](grid.HandlerListDir, grid.NewMSS, nil, func() *ListDirResult { return &ListDirResult{} }).WithOutCapacity(1)
) )
func getStorageViaEndpoint(endpoint Endpoint) StorageAPI { func getStorageViaEndpoint(endpoint Endpoint) StorageAPI {
@ -693,10 +694,15 @@ func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*Rena
} }
sign, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath, p.Opts) sign, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath, p.Opts)
resp := &RenameDataResp{ return &RenameDataResp{
Signature: sign, Signature: sign,
} }, grid.NewRemoteErr(err)
return resp, grid.NewRemoteErr(err) }
// RenameDataInlineHandler - renames a meta object and data dir to destination.
func (s *storageRESTServer) RenameDataInlineHandler(p *RenameDataInlineHandlerParams) (*RenameDataResp, *grid.RemoteErr) {
defer p.Recycle()
return s.RenameDataHandler(&p.RenameDataHandlerParams)
} }
// RenameFileHandler - rename a file from source to destination // RenameFileHandler - rename a file from source to destination
@ -1309,6 +1315,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
logger.FatalIf(storageWriteAllRPC.Register(gm, server.WriteAllHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageWriteAllRPC.Register(gm, server.WriteAllHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageRenameFileRPC.Register(gm, server.RenameFileHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageRenameFileRPC.Register(gm, server.RenameFileHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageRenameDataRPC.Register(gm, server.RenameDataHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageRenameDataRPC.Register(gm, server.RenameDataHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageRenameDataInlineRPC.Register(gm, server.RenameDataInlineHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageDeleteFileRPC.Register(gm, server.DeleteFileHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageDeleteFileRPC.Register(gm, server.DeleteFileHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageCheckPartsRPC.Register(gm, server.CheckPartsHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageCheckPartsRPC.Register(gm, server.CheckPartsHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageReadVersionRPC.Register(gm, server.ReadVersionHandlerWS, endpoint.Path), "unable to register handler") logger.FatalIf(storageReadVersionRPC.Register(gm, server.ReadVersionHandlerWS, endpoint.Path), "unable to register handler")

View File

@ -1664,7 +1664,7 @@ func ExecObjectLayerAPIAnonTest(t *testing.T, obj ObjectLayer, testName, bucketN
} }
} }
// ExecObjectLayerAPINilTest - Sets the object layer to `nil`, and calls rhe registered object layer API endpoint, // ExecObjectLayerAPINilTest - Sets the object layer to `nil`, and calls the registered object layer API endpoint,
// and assert the error response. The purpose is to validate the API handlers response when the object layer is uninitialized. // and assert the error response. The purpose is to validate the API handlers response when the object layer is uninitialized.
// Usage hint: Should be used at the end of the API end points tests (ex: check the last few lines of `testAPIListObjectPartsHandler`), // Usage hint: Should be used at the end of the API end points tests (ex: check the last few lines of `testAPIListObjectPartsHandler`),
// need a sample HTTP request to be sent as argument so that the relevant handler is called, the handler registration is expected // need a sample HTTP request to be sent as argument so that the relevant handler is called, the handler registration is expected

View File

@ -550,10 +550,9 @@ func (c *Connection) queueMsg(msg message, payload sender) error {
// This cannot encode subroute. // This cannot encode subroute.
msg.Flags.Clear(FlagSubroute) msg.Flags.Clear(FlagSubroute)
if payload != nil { if payload != nil {
if cap(msg.Payload) < payload.Msgsize() { if sz := payload.Msgsize(); cap(msg.Payload) < sz {
old := msg.Payload PutByteBuffer(msg.Payload)
msg.Payload = GetByteBuffer()[:0] msg.Payload = GetByteBufferCap(sz)
PutByteBuffer(old)
} }
var err error var err error
msg.Payload, err = payload.MarshalMsg(msg.Payload[:0]) msg.Payload, err = payload.MarshalMsg(msg.Payload[:0])
@ -563,7 +562,7 @@ func (c *Connection) queueMsg(msg message, payload sender) error {
} }
} }
defer PutByteBuffer(msg.Payload) defer PutByteBuffer(msg.Payload)
dst := GetByteBuffer()[:0] dst := GetByteBufferCap(msg.Msgsize())
dst, err := msg.MarshalMsg(dst) dst, err := msg.MarshalMsg(dst)
if err != nil { if err != nil {
return err return err
@ -578,9 +577,9 @@ func (c *Connection) queueMsg(msg message, payload sender) error {
// sendMsg will send // sendMsg will send
func (c *Connection) sendMsg(conn net.Conn, msg message, payload msgp.MarshalSizer) error { func (c *Connection) sendMsg(conn net.Conn, msg message, payload msgp.MarshalSizer) error {
if payload != nil { if payload != nil {
if cap(msg.Payload) < payload.Msgsize() { if sz := payload.Msgsize(); cap(msg.Payload) < sz {
PutByteBuffer(msg.Payload) PutByteBuffer(msg.Payload)
msg.Payload = GetByteBuffer()[:0] msg.Payload = GetByteBufferCap(sz)[:0]
} }
var err error var err error
msg.Payload, err = payload.MarshalMsg(msg.Payload) msg.Payload, err = payload.MarshalMsg(msg.Payload)
@ -589,7 +588,7 @@ func (c *Connection) sendMsg(conn net.Conn, msg message, payload msgp.MarshalSiz
} }
defer PutByteBuffer(msg.Payload) defer PutByteBuffer(msg.Payload)
} }
dst := GetByteBuffer()[:0] dst := GetByteBufferCap(msg.Msgsize())
dst, err := msg.MarshalMsg(dst) dst, err := msg.MarshalMsg(dst)
if err != nil { if err != nil {
return err return err

View File

@ -42,7 +42,13 @@ const (
// maxBufferSize is the maximum buffer size. // maxBufferSize is the maximum buffer size.
// Buffers larger than this is not reused. // Buffers larger than this is not reused.
maxBufferSize = 64 << 10 maxBufferSize = 96 << 10
// This is the assumed size of bigger buffers and allocation size.
biggerBufMin = 32 << 10
// This is the maximum size of bigger buffers.
biggerBufMax = maxBufferSize
// If there is a queue, merge up to this many messages. // If there is a queue, merge up to this many messages.
maxMergeMessages = 30 maxMergeMessages = 30
@ -63,6 +69,13 @@ var internalByteBuffer = sync.Pool{
}, },
} }
var internal32KByteBuffer = sync.Pool{
New: func() any {
m := make([]byte, 0, biggerBufMin)
return &m
},
}
// GetByteBuffer can be replaced with a function that returns a small // GetByteBuffer can be replaced with a function that returns a small
// byte buffer. // byte buffer.
// When replacing PutByteBuffer should also be replaced // When replacing PutByteBuffer should also be replaced
@ -72,10 +85,27 @@ var GetByteBuffer = func() []byte {
return b[:0] return b[:0]
} }
// GetByteBufferCap returns a length 0 byte buffer with at least the given capacity.
func GetByteBufferCap(wantSz int) []byte {
switch {
case wantSz <= defaultBufferSize:
return GetByteBuffer()[:0]
case wantSz <= maxBufferSize:
b := *internal32KByteBuffer.Get().(*[]byte)
return b[:0]
}
return make([]byte, 0, wantSz)
}
// PutByteBuffer is for returning byte buffers. // PutByteBuffer is for returning byte buffers.
var PutByteBuffer = func(b []byte) { var PutByteBuffer = func(b []byte) {
if cap(b) >= minBufferSize && cap(b) < maxBufferSize { if cap(b) >= biggerBufMin && cap(b) < biggerBufMax {
internal32KByteBuffer.Put(&b)
return
}
if cap(b) >= minBufferSize && cap(b) < biggerBufMin {
internalByteBuffer.Put(&b) internalByteBuffer.Put(&b)
return
} }
} }
@ -117,11 +147,7 @@ type writerWrapper struct {
} }
func (w *writerWrapper) Write(p []byte) (n int, err error) { func (w *writerWrapper) Write(p []byte) (n int, err error) {
buf := GetByteBuffer() buf := GetByteBufferCap(len(p))
if cap(buf) < len(p) {
PutByteBuffer(buf)
buf = make([]byte, len(p))
}
buf = buf[:len(p)] buf = buf[:len(p)]
copy(buf, p) copy(buf, p)
select { select {

View File

@ -111,6 +111,7 @@ const (
HandlerGetBandwidth HandlerGetBandwidth
HandlerWriteAll HandlerWriteAll
HandlerListBuckets HandlerListBuckets
HandlerRenameDataInline
// Add more above here ^^^ // Add more above here ^^^
// If all handlers are used, the type of Handler can be changed. // If all handlers are used, the type of Handler can be changed.
@ -546,7 +547,7 @@ func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Re
} }
return resp, ErrDisconnected return resp, ErrDisconnected
} }
payload, err := req.MarshalMsg(GetByteBuffer()[:0]) payload, err := req.MarshalMsg(GetByteBufferCap(req.Msgsize()))
if err != nil { if err != nil {
return resp, err return resp, err
} }
@ -788,8 +789,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func
if dropOutput { if dropOutput {
continue continue
} }
dst := GetByteBuffer() dst, err := v.MarshalMsg(GetByteBufferCap(v.Msgsize()))
dst, err := v.MarshalMsg(dst[:0])
if err != nil { if err != nil {
logger.LogOnceIf(ctx, err, err.Error()) logger.LogOnceIf(ctx, err, err.Error())
} }
@ -853,7 +853,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre
var payloadB []byte var payloadB []byte
if h.WithPayload { if h.WithPayload {
var err error var err error
payloadB, err = payload.MarshalMsg(GetByteBuffer()[:0]) payloadB, err = payload.MarshalMsg(GetByteBufferCap(payload.Msgsize()))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -875,7 +875,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre
go func() { go func() {
defer xioutil.SafeClose(stream.Requests) defer xioutil.SafeClose(stream.Requests)
for req := range reqT { for req := range reqT {
b, err := req.MarshalMsg(GetByteBuffer()[:0]) b, err := req.MarshalMsg(GetByteBufferCap(req.Msgsize()))
if err != nil { if err != nil {
logger.LogOnceIf(ctx, err, err.Error()) logger.LogOnceIf(ctx, err, err.Error())
} }

View File

@ -80,14 +80,15 @@ func _() {
_ = x[HandlerGetBandwidth-69] _ = x[HandlerGetBandwidth-69]
_ = x[HandlerWriteAll-70] _ = x[HandlerWriteAll-70]
_ = x[HandlerListBuckets-71] _ = x[HandlerListBuckets-71]
_ = x[handlerTest-72] _ = x[HandlerRenameDataInline-72]
_ = x[handlerTest2-73] _ = x[handlerTest-73]
_ = x[handlerLast-74] _ = x[handlerTest2-74]
_ = x[handlerLast-75]
} }
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketshandlerTesthandlerTest2handlerLast" const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketsRenameDataInlinehandlerTesthandlerTest2handlerLast"
var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 899, 911, 922} var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 904, 915, 927, 938}
func (i HandlerID) String() string { func (i HandlerID) String() string {
if i >= HandlerID(len(_HandlerID_index)-1) { if i >= HandlerID(len(_HandlerID_index)-1) {

View File

@ -145,7 +145,7 @@ func (m *muxClient) send(msg message) error {
// sendLocked the message. msg.Seq and msg.MuxID will be set. // sendLocked the message. msg.Seq and msg.MuxID will be set.
// m.respMu must be held. // m.respMu must be held.
func (m *muxClient) sendLocked(msg message) error { func (m *muxClient) sendLocked(msg message) error {
dst := GetByteBuffer()[:0] dst := GetByteBufferCap(msg.Msgsize())
msg.Seq = m.SendSeq msg.Seq = m.SendSeq
msg.MuxID = m.MuxID msg.MuxID = m.MuxID
msg.Flags |= m.BaseFlags msg.Flags |= m.BaseFlags

View File

@ -189,6 +189,12 @@ func NewBytes() *Bytes {
return &b return &b
} }
// NewBytesCap returns an empty Bytes with the given capacity.
func NewBytesCap(size int) *Bytes {
b := Bytes(GetByteBufferCap(size))
return &b
}
// NewBytesWith returns a new Bytes with the provided content. // NewBytesWith returns a new Bytes with the provided content.
// When sent as a parameter, the caller gives up ownership of the byte slice. // When sent as a parameter, the caller gives up ownership of the byte slice.
// When returned as response, the handler also gives up ownership of the byte slice. // When returned as response, the handler also gives up ownership of the byte slice.
@ -203,14 +209,9 @@ func NewBytesWithCopyOf(b []byte) *Bytes {
bb := Bytes(nil) bb := Bytes(nil)
return &bb return &bb
} }
if len(b) < maxBufferSize { bb := NewBytesCap(len(b))
bb := NewBytes() *bb = append(*bb, b...)
*bb = append(*bb, b...) return bb
return bb
}
bb := Bytes(make([]byte, len(b)))
copy(bb, b)
return &bb
} }
// Bytes provides a byte slice that can be serialized. // Bytes provides a byte slice that can be serialized.
@ -238,7 +239,7 @@ func (b *Bytes) UnmarshalMsg(bytes []byte) ([]byte, error) {
copy(*b, val) copy(*b, val)
} else { } else {
if cap(*b) == 0 && len(val) <= maxBufferSize { if cap(*b) == 0 && len(val) <= maxBufferSize {
*b = GetByteBuffer()[:0] *b = GetByteBufferCap(len(val))
} else { } else {
PutByteBuffer(*b) PutByteBuffer(*b)
*b = make([]byte, 0, len(val)) *b = make([]byte, 0, len(val))