diff --git a/.typos.toml b/.typos.toml index a32263cad..fafe3ffd1 100644 --- a/.typos.toml +++ b/.typos.toml @@ -19,6 +19,19 @@ extend-ignore-re = [ [default.extend-words] "encrypter" = "encrypter" "requestor" = "requestor" +"KMS" = "KMS" +"kms" = "kms" +"Kms" = "Kms" +"Dur" = "Dur" +"EOF" = "EOF" +"hd" = "hd" +"ws" = "ws" +"guid" = "guid" +"lst" = "lst" +"pn" = "pn" +"Iy" = "Iy" +"ro" = "ro" +"thr" = "thr" [default.extend-identifiers] "bui" = "bui" diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index c6dc10031..413a733ec 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -29,6 +29,7 @@ import ( "time" "github.com/minio/madmin-go/v3" + "github.com/minio/minio/internal/grid" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/sync/errgroup" ) @@ -539,7 +540,10 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object } partPath := pathJoin(tmpID, dstDataDir, fmt.Sprintf("part.%d", partNumber)) if len(inlineBuffers) > 0 { - inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(latestMeta.Size)+32)) + buf := grid.GetByteBufferCap(int(erasure.ShardFileSize(latestMeta.Size)) + 64) + inlineBuffers[i] = bytes.NewBuffer(buf[:0]) + defer grid.PutByteBuffer(buf) + writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) } else { writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, partPath, diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index e53a4b3c7..52f0081b8 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -41,6 +41,7 @@ import ( "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/grid" "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" xioutil "github.com/minio/minio/internal/ioutil" @@ -1147,7 +1148,6 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r * buffer = buffer[:fi.Erasure.BlockSize] } - shardFileSize := erasure.ShardFileSize(data.Size()) writers := make([]io.Writer, len(onlineDisks)) inlineBuffers := make([]*bytes.Buffer, len(onlineDisks)) for i, disk := range onlineDisks { @@ -1155,7 +1155,9 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r * continue } if disk.IsOnline() { - inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, shardFileSize)) + buf := grid.GetByteBufferCap(int(erasure.ShardFileSize(data.Size())) + 64) + inlineBuffers[i] = bytes.NewBuffer(buf[:0]) + defer grid.PutByteBuffer(buf) writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) } } @@ -1435,11 +1437,9 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } if len(inlineBuffers) > 0 { - sz := shardFileSize - if sz < 0 { - sz = data.ActualSize() - } - inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, sz)) + buf := grid.GetByteBufferCap(int(shardFileSize) + 64) + inlineBuffers[i] = bytes.NewBuffer(buf[:0]) + defer grid.PutByteBuffer(buf) writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) continue } @@ -1448,7 +1448,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } toEncode := io.Reader(data) - if data.Size() > bigFileThreshold { + if data.Size() >= bigFileThreshold { // We use 2 buffers, so we always have a full buffer of input. bufA := globalBytePoolCap.Get() bufB := globalBytePoolCap.Get() diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 672cd8bed..01d29a44a 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -21,6 +21,8 @@ import ( "time" "github.com/minio/minio/internal/crypto" + "github.com/minio/minio/internal/grid" + xioutil "github.com/minio/minio/internal/ioutil" ) //go:generate msgp -file=$GOFILE @@ -433,6 +435,27 @@ type RenameDataHandlerParams struct { Opts RenameOptions `msg:"ro"` } +// RenameDataInlineHandlerParams are parameters for RenameDataHandler with a buffer for inline data. +type RenameDataInlineHandlerParams struct { + RenameDataHandlerParams `msg:"p"` +} + +func newRenameDataInlineHandlerParams() *RenameDataInlineHandlerParams { + buf := grid.GetByteBufferCap(32 + 16<<10) + return &RenameDataInlineHandlerParams{RenameDataHandlerParams{FI: FileInfo{Data: buf[:0]}}} +} + +// Recycle will reuse the memory allocated for the FileInfo data. +func (r *RenameDataInlineHandlerParams) Recycle() { + if r == nil { + return + } + if cap(r.FI.Data) >= xioutil.BlockSizeSmall { + grid.PutByteBuffer(r.FI.Data) + r.FI.Data = nil + } +} + // RenameFileHandlerParams are parameters for RenameFileHandler. type RenameFileHandlerParams struct { DiskID string `msg:"id"` diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index abf90e4e8..7bbeb3ebd 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -4626,6 +4626,113 @@ func (z *RenameDataHandlerParams) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *RenameDataInlineHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "p": + err = z.RenameDataHandlerParams.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "RenameDataHandlerParams") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *RenameDataInlineHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "p" + err = en.Append(0x81, 0xa1, 0x70) + if err != nil { + return + } + err = z.RenameDataHandlerParams.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "RenameDataHandlerParams") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *RenameDataInlineHandlerParams) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "p" + o = append(o, 0x81, 0xa1, 0x70) + o, err = z.RenameDataHandlerParams.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "RenameDataHandlerParams") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *RenameDataInlineHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "p": + bts, err = z.RenameDataHandlerParams.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "RenameDataHandlerParams") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *RenameDataInlineHandlerParams) Msgsize() (s int) { + s = 1 + 2 + z.RenameDataHandlerParams.Msgsize() + return +} + // DecodeMsg implements msgp.Decodable func (z *RenameDataResp) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cmd/storage-datatypes_gen_test.go b/cmd/storage-datatypes_gen_test.go index cee67107f..a801cfd34 100644 --- a/cmd/storage-datatypes_gen_test.go +++ b/cmd/storage-datatypes_gen_test.go @@ -2156,6 +2156,119 @@ func BenchmarkDecodeRenameDataHandlerParams(b *testing.B) { } } +func TestMarshalUnmarshalRenameDataInlineHandlerParams(t *testing.T) { + v := RenameDataInlineHandlerParams{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgRenameDataInlineHandlerParams(b *testing.B) { + v := RenameDataInlineHandlerParams{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgRenameDataInlineHandlerParams(b *testing.B) { + v := RenameDataInlineHandlerParams{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalRenameDataInlineHandlerParams(b *testing.B) { + v := RenameDataInlineHandlerParams{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeRenameDataInlineHandlerParams(t *testing.T) { + v := RenameDataInlineHandlerParams{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeRenameDataInlineHandlerParams Msgsize() is inaccurate") + } + + vn := RenameDataInlineHandlerParams{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeRenameDataInlineHandlerParams(b *testing.B) { + v := RenameDataInlineHandlerParams{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeRenameDataInlineHandlerParams(b *testing.B) { + v := RenameDataInlineHandlerParams{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalRenameDataResp(t *testing.T) { v := RenameDataResp{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index daa52a704..a991284ad 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -468,7 +468,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string, // RenameData - rename source path to destination path atomically, metadata and data file. func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) { - resp, err := storageRenameDataRPC.Call(ctx, client.gridConn, &RenameDataHandlerParams{ + params := RenameDataHandlerParams{ DiskID: client.diskID, SrcVolume: srcVolume, SrcPath: srcPath, @@ -476,10 +476,17 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP DstVolume: dstVolume, FI: fi, Opts: opts, - }) + } + var resp *RenameDataResp + if fi.Data == nil { + resp, err = storageRenameDataRPC.Call(ctx, client.gridConn, ¶ms) + } else { + resp, err = storageRenameDataInlineRPC.Call(ctx, client.gridConn, &RenameDataInlineHandlerParams{params}) + } if err != nil { return 0, toStorageErr(err) } + defer storageRenameDataRPC.PutResponse(resp) return resp.Signature, nil } diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 994b7b0eb..b860ce2dc 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -58,21 +58,22 @@ type storageRESTServer struct { } var ( - storageCheckPartsRPC = grid.NewSingleHandler[*CheckPartsHandlerParams, grid.NoPayload](grid.HandlerCheckParts, func() *CheckPartsHandlerParams { return &CheckPartsHandlerParams{} }, grid.NewNoPayload) - storageDeleteFileRPC = grid.NewSingleHandler[*DeleteFileHandlerParams, grid.NoPayload](grid.HandlerDeleteFile, func() *DeleteFileHandlerParams { return &DeleteFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true) - storageDeleteVersionRPC = grid.NewSingleHandler[*DeleteVersionHandlerParams, grid.NoPayload](grid.HandlerDeleteVersion, func() *DeleteVersionHandlerParams { return &DeleteVersionHandlerParams{} }, grid.NewNoPayload) - storageDiskInfoRPC = grid.NewSingleHandler[*DiskInfoOptions, *DiskInfo](grid.HandlerDiskInfo, func() *DiskInfoOptions { return &DiskInfoOptions{} }, func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse().AllowCallRequestPool(true) - storageNSScannerRPC = grid.NewStream[*nsScannerOptions, grid.NoPayload, *nsScannerResp](grid.HandlerNSScanner, func() *nsScannerOptions { return &nsScannerOptions{} }, nil, func() *nsScannerResp { return &nsScannerResp{} }) - storageReadAllRPC = grid.NewSingleHandler[*ReadAllHandlerParams, *grid.Bytes](grid.HandlerReadAll, func() *ReadAllHandlerParams { return &ReadAllHandlerParams{} }, grid.NewBytes).AllowCallRequestPool(true) - storageWriteAllRPC = grid.NewSingleHandler[*WriteAllHandlerParams, grid.NoPayload](grid.HandlerWriteAll, func() *WriteAllHandlerParams { return &WriteAllHandlerParams{} }, grid.NewNoPayload) - storageReadVersionRPC = grid.NewSingleHandler[*grid.MSS, *FileInfo](grid.HandlerReadVersion, grid.NewMSS, func() *FileInfo { return &FileInfo{} }) - storageReadXLRPC = grid.NewSingleHandler[*grid.MSS, *RawFileInfo](grid.HandlerReadXL, grid.NewMSS, func() *RawFileInfo { return &RawFileInfo{} }) - storageRenameDataRPC = grid.NewSingleHandler[*RenameDataHandlerParams, *RenameDataResp](grid.HandlerRenameData, func() *RenameDataHandlerParams { return &RenameDataHandlerParams{} }, func() *RenameDataResp { return &RenameDataResp{} }) - storageRenameFileRPC = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true) - storageStatVolRPC = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerStatVol, grid.NewMSS, func() *VolInfo { return &VolInfo{} }) - storageUpdateMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerUpdateMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload) - storageWriteMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerWriteMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload) - storageListDirRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *ListDirResult](grid.HandlerListDir, grid.NewMSS, nil, func() *ListDirResult { return &ListDirResult{} }).WithOutCapacity(1) + storageCheckPartsRPC = grid.NewSingleHandler[*CheckPartsHandlerParams, grid.NoPayload](grid.HandlerCheckParts, func() *CheckPartsHandlerParams { return &CheckPartsHandlerParams{} }, grid.NewNoPayload) + storageDeleteFileRPC = grid.NewSingleHandler[*DeleteFileHandlerParams, grid.NoPayload](grid.HandlerDeleteFile, func() *DeleteFileHandlerParams { return &DeleteFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true) + storageDeleteVersionRPC = grid.NewSingleHandler[*DeleteVersionHandlerParams, grid.NoPayload](grid.HandlerDeleteVersion, func() *DeleteVersionHandlerParams { return &DeleteVersionHandlerParams{} }, grid.NewNoPayload) + storageDiskInfoRPC = grid.NewSingleHandler[*DiskInfoOptions, *DiskInfo](grid.HandlerDiskInfo, func() *DiskInfoOptions { return &DiskInfoOptions{} }, func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse().AllowCallRequestPool(true) + storageNSScannerRPC = grid.NewStream[*nsScannerOptions, grid.NoPayload, *nsScannerResp](grid.HandlerNSScanner, func() *nsScannerOptions { return &nsScannerOptions{} }, nil, func() *nsScannerResp { return &nsScannerResp{} }) + storageReadAllRPC = grid.NewSingleHandler[*ReadAllHandlerParams, *grid.Bytes](grid.HandlerReadAll, func() *ReadAllHandlerParams { return &ReadAllHandlerParams{} }, grid.NewBytes).AllowCallRequestPool(true) + storageWriteAllRPC = grid.NewSingleHandler[*WriteAllHandlerParams, grid.NoPayload](grid.HandlerWriteAll, func() *WriteAllHandlerParams { return &WriteAllHandlerParams{} }, grid.NewNoPayload) + storageReadVersionRPC = grid.NewSingleHandler[*grid.MSS, *FileInfo](grid.HandlerReadVersion, grid.NewMSS, func() *FileInfo { return &FileInfo{} }) + storageReadXLRPC = grid.NewSingleHandler[*grid.MSS, *RawFileInfo](grid.HandlerReadXL, grid.NewMSS, func() *RawFileInfo { return &RawFileInfo{} }) + storageRenameDataRPC = grid.NewSingleHandler[*RenameDataHandlerParams, *RenameDataResp](grid.HandlerRenameData, func() *RenameDataHandlerParams { return &RenameDataHandlerParams{} }, func() *RenameDataResp { return &RenameDataResp{} }) + storageRenameDataInlineRPC = grid.NewSingleHandler[*RenameDataInlineHandlerParams, *RenameDataResp](grid.HandlerRenameDataInline, newRenameDataInlineHandlerParams, func() *RenameDataResp { return &RenameDataResp{} }).AllowCallRequestPool(false) + storageRenameFileRPC = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true) + storageStatVolRPC = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerStatVol, grid.NewMSS, func() *VolInfo { return &VolInfo{} }) + storageUpdateMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerUpdateMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload) + storageWriteMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerWriteMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload) + storageListDirRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *ListDirResult](grid.HandlerListDir, grid.NewMSS, nil, func() *ListDirResult { return &ListDirResult{} }).WithOutCapacity(1) ) func getStorageViaEndpoint(endpoint Endpoint) StorageAPI { @@ -693,10 +694,15 @@ func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*Rena } sign, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath, p.Opts) - resp := &RenameDataResp{ + return &RenameDataResp{ Signature: sign, - } - return resp, grid.NewRemoteErr(err) + }, grid.NewRemoteErr(err) +} + +// RenameDataInlineHandler - renames a meta object and data dir to destination. +func (s *storageRESTServer) RenameDataInlineHandler(p *RenameDataInlineHandlerParams) (*RenameDataResp, *grid.RemoteErr) { + defer p.Recycle() + return s.RenameDataHandler(&p.RenameDataHandlerParams) } // RenameFileHandler - rename a file from source to destination @@ -1309,6 +1315,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin logger.FatalIf(storageWriteAllRPC.Register(gm, server.WriteAllHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageRenameFileRPC.Register(gm, server.RenameFileHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageRenameDataRPC.Register(gm, server.RenameDataHandler, endpoint.Path), "unable to register handler") + logger.FatalIf(storageRenameDataInlineRPC.Register(gm, server.RenameDataInlineHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageDeleteFileRPC.Register(gm, server.DeleteFileHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageCheckPartsRPC.Register(gm, server.CheckPartsHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageReadVersionRPC.Register(gm, server.ReadVersionHandlerWS, endpoint.Path), "unable to register handler") diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 39970ccd0..acf3b52f5 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -1664,7 +1664,7 @@ func ExecObjectLayerAPIAnonTest(t *testing.T, obj ObjectLayer, testName, bucketN } } -// ExecObjectLayerAPINilTest - Sets the object layer to `nil`, and calls rhe registered object layer API endpoint, +// ExecObjectLayerAPINilTest - Sets the object layer to `nil`, and calls the registered object layer API endpoint, // and assert the error response. The purpose is to validate the API handlers response when the object layer is uninitialized. // Usage hint: Should be used at the end of the API end points tests (ex: check the last few lines of `testAPIListObjectPartsHandler`), // need a sample HTTP request to be sent as argument so that the relevant handler is called, the handler registration is expected diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 6bb2da408..022aa247d 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -550,10 +550,9 @@ func (c *Connection) queueMsg(msg message, payload sender) error { // This cannot encode subroute. msg.Flags.Clear(FlagSubroute) if payload != nil { - if cap(msg.Payload) < payload.Msgsize() { - old := msg.Payload - msg.Payload = GetByteBuffer()[:0] - PutByteBuffer(old) + if sz := payload.Msgsize(); cap(msg.Payload) < sz { + PutByteBuffer(msg.Payload) + msg.Payload = GetByteBufferCap(sz) } var err error msg.Payload, err = payload.MarshalMsg(msg.Payload[:0]) @@ -563,7 +562,7 @@ func (c *Connection) queueMsg(msg message, payload sender) error { } } defer PutByteBuffer(msg.Payload) - dst := GetByteBuffer()[:0] + dst := GetByteBufferCap(msg.Msgsize()) dst, err := msg.MarshalMsg(dst) if err != nil { return err @@ -578,9 +577,9 @@ func (c *Connection) queueMsg(msg message, payload sender) error { // sendMsg will send func (c *Connection) sendMsg(conn net.Conn, msg message, payload msgp.MarshalSizer) error { if payload != nil { - if cap(msg.Payload) < payload.Msgsize() { + if sz := payload.Msgsize(); cap(msg.Payload) < sz { PutByteBuffer(msg.Payload) - msg.Payload = GetByteBuffer()[:0] + msg.Payload = GetByteBufferCap(sz)[:0] } var err error msg.Payload, err = payload.MarshalMsg(msg.Payload) @@ -589,7 +588,7 @@ func (c *Connection) sendMsg(conn net.Conn, msg message, payload msgp.MarshalSiz } defer PutByteBuffer(msg.Payload) } - dst := GetByteBuffer()[:0] + dst := GetByteBufferCap(msg.Msgsize()) dst, err := msg.MarshalMsg(dst) if err != nil { return err diff --git a/internal/grid/grid.go b/internal/grid/grid.go index 2652ce053..a26363f5b 100644 --- a/internal/grid/grid.go +++ b/internal/grid/grid.go @@ -42,7 +42,13 @@ const ( // maxBufferSize is the maximum buffer size. // Buffers larger than this is not reused. - maxBufferSize = 64 << 10 + maxBufferSize = 96 << 10 + + // This is the assumed size of bigger buffers and allocation size. + biggerBufMin = 32 << 10 + + // This is the maximum size of bigger buffers. + biggerBufMax = maxBufferSize // If there is a queue, merge up to this many messages. maxMergeMessages = 30 @@ -63,6 +69,13 @@ var internalByteBuffer = sync.Pool{ }, } +var internal32KByteBuffer = sync.Pool{ + New: func() any { + m := make([]byte, 0, biggerBufMin) + return &m + }, +} + // GetByteBuffer can be replaced with a function that returns a small // byte buffer. // When replacing PutByteBuffer should also be replaced @@ -72,10 +85,27 @@ var GetByteBuffer = func() []byte { return b[:0] } +// GetByteBufferCap returns a length 0 byte buffer with at least the given capacity. +func GetByteBufferCap(wantSz int) []byte { + switch { + case wantSz <= defaultBufferSize: + return GetByteBuffer()[:0] + case wantSz <= maxBufferSize: + b := *internal32KByteBuffer.Get().(*[]byte) + return b[:0] + } + return make([]byte, 0, wantSz) +} + // PutByteBuffer is for returning byte buffers. var PutByteBuffer = func(b []byte) { - if cap(b) >= minBufferSize && cap(b) < maxBufferSize { + if cap(b) >= biggerBufMin && cap(b) < biggerBufMax { + internal32KByteBuffer.Put(&b) + return + } + if cap(b) >= minBufferSize && cap(b) < biggerBufMin { internalByteBuffer.Put(&b) + return } } @@ -117,11 +147,7 @@ type writerWrapper struct { } func (w *writerWrapper) Write(p []byte) (n int, err error) { - buf := GetByteBuffer() - if cap(buf) < len(p) { - PutByteBuffer(buf) - buf = make([]byte, len(p)) - } + buf := GetByteBufferCap(len(p)) buf = buf[:len(p)] copy(buf, p) select { diff --git a/internal/grid/handlers.go b/internal/grid/handlers.go index 59b03ba69..656579bf9 100644 --- a/internal/grid/handlers.go +++ b/internal/grid/handlers.go @@ -111,6 +111,7 @@ const ( HandlerGetBandwidth HandlerWriteAll HandlerListBuckets + HandlerRenameDataInline // Add more above here ^^^ // If all handlers are used, the type of Handler can be changed. @@ -546,7 +547,7 @@ func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Re } return resp, ErrDisconnected } - payload, err := req.MarshalMsg(GetByteBuffer()[:0]) + payload, err := req.MarshalMsg(GetByteBufferCap(req.Msgsize())) if err != nil { return resp, err } @@ -788,8 +789,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) register(m *Manager, handle func if dropOutput { continue } - dst := GetByteBuffer() - dst, err := v.MarshalMsg(dst[:0]) + dst, err := v.MarshalMsg(GetByteBufferCap(v.Msgsize())) if err != nil { logger.LogOnceIf(ctx, err, err.Error()) } @@ -853,7 +853,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre var payloadB []byte if h.WithPayload { var err error - payloadB, err = payload.MarshalMsg(GetByteBuffer()[:0]) + payloadB, err = payload.MarshalMsg(GetByteBufferCap(payload.Msgsize())) if err != nil { return nil, err } @@ -875,7 +875,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre go func() { defer xioutil.SafeClose(stream.Requests) for req := range reqT { - b, err := req.MarshalMsg(GetByteBuffer()[:0]) + b, err := req.MarshalMsg(GetByteBufferCap(req.Msgsize())) if err != nil { logger.LogOnceIf(ctx, err, err.Error()) } diff --git a/internal/grid/handlers_string.go b/internal/grid/handlers_string.go index 6474ec2da..fa712990b 100644 --- a/internal/grid/handlers_string.go +++ b/internal/grid/handlers_string.go @@ -80,14 +80,15 @@ func _() { _ = x[HandlerGetBandwidth-69] _ = x[HandlerWriteAll-70] _ = x[HandlerListBuckets-71] - _ = x[handlerTest-72] - _ = x[handlerTest2-73] - _ = x[handlerLast-74] + _ = x[HandlerRenameDataInline-72] + _ = x[handlerTest-73] + _ = x[handlerTest2-74] + _ = x[handlerLast-75] } -const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketshandlerTesthandlerTest2handlerLast" +const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketsRenameDataInlinehandlerTesthandlerTest2handlerLast" -var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 899, 911, 922} +var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 904, 915, 927, 938} func (i HandlerID) String() string { if i >= HandlerID(len(_HandlerID_index)-1) { diff --git a/internal/grid/muxclient.go b/internal/grid/muxclient.go index 7fa4ce29a..feabddec3 100644 --- a/internal/grid/muxclient.go +++ b/internal/grid/muxclient.go @@ -145,7 +145,7 @@ func (m *muxClient) send(msg message) error { // sendLocked the message. msg.Seq and msg.MuxID will be set. // m.respMu must be held. func (m *muxClient) sendLocked(msg message) error { - dst := GetByteBuffer()[:0] + dst := GetByteBufferCap(msg.Msgsize()) msg.Seq = m.SendSeq msg.MuxID = m.MuxID msg.Flags |= m.BaseFlags diff --git a/internal/grid/types.go b/internal/grid/types.go index f60f29d24..4422372dc 100644 --- a/internal/grid/types.go +++ b/internal/grid/types.go @@ -189,6 +189,12 @@ func NewBytes() *Bytes { return &b } +// NewBytesCap returns an empty Bytes with the given capacity. +func NewBytesCap(size int) *Bytes { + b := Bytes(GetByteBufferCap(size)) + return &b +} + // NewBytesWith returns a new Bytes with the provided content. // When sent as a parameter, the caller gives up ownership of the byte slice. // When returned as response, the handler also gives up ownership of the byte slice. @@ -203,14 +209,9 @@ func NewBytesWithCopyOf(b []byte) *Bytes { bb := Bytes(nil) return &bb } - if len(b) < maxBufferSize { - bb := NewBytes() - *bb = append(*bb, b...) - return bb - } - bb := Bytes(make([]byte, len(b))) - copy(bb, b) - return &bb + bb := NewBytesCap(len(b)) + *bb = append(*bb, b...) + return bb } // Bytes provides a byte slice that can be serialized. @@ -238,7 +239,7 @@ func (b *Bytes) UnmarshalMsg(bytes []byte) ([]byte, error) { copy(*b, val) } else { if cap(*b) == 0 && len(val) <= maxBufferSize { - *b = GetByteBuffer()[:0] + *b = GetByteBufferCap(len(val)) } else { PutByteBuffer(*b) *b = make([]byte, 0, len(val))