diff --git a/cmd/common-main.go b/cmd/common-main.go index 2b52a2df4..251ad4fa0 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -355,10 +355,8 @@ func buildServerCtxt(ctx *cli.Context, ctxt *serverCtxt) (err error) { } // Check "no-compat" flag from command line argument. - ctxt.StrictS3Compat = true - if ctx.IsSet("no-compat") || ctx.GlobalIsSet("no-compat") { - ctxt.StrictS3Compat = false - } + ctxt.StrictS3Compat = !(ctx.IsSet("no-compat") || ctx.GlobalIsSet("no-compat")) + ctxt.PreAllocate = ctx.IsSet("pre-allocate") || ctx.GlobalIsSet("pre-allocate") switch { case ctx.IsSet("config-dir"): diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 0ae12d56c..3a62b67a8 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -662,12 +662,12 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Account for padding and forced compression overhead and encryption. buffer = make([]byte, data.ActualSize()+256+32+32, data.ActualSize()*2+512) } else { - buffer = er.bp.Get() - defer er.bp.Put(buffer) + buffer = globalBytePoolCap.Get() + defer globalBytePoolCap.Put(buffer) } case size >= fi.Erasure.BlockSize: - buffer = er.bp.Get() - defer er.bp.Put(buffer) + buffer = globalBytePoolCap.Get() + defer globalBytePoolCap.Put(buffer) case size < fi.Erasure.BlockSize: // No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) @@ -688,10 +688,10 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo if data.Size() > bigFileThreshold { // Add input readahead. // We use 2 buffers, so we always have a full buffer of input. - bufA := er.bp.Get() - bufB := er.bp.Get() - defer er.bp.Put(bufA) - defer er.bp.Put(bufB) + bufA := globalBytePoolCap.Get() + bufB := globalBytePoolCap.Get() + defer globalBytePoolCap.Put(bufA) + defer globalBytePoolCap.Put(bufB) ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]}) if err == nil { toEncode = ra diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index fc74295f4..c5744df86 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1127,8 +1127,8 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r * case size == 0: buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF case size >= fi.Erasure.BlockSize: - buffer = er.bp.Get() - defer er.bp.Put(buffer) + buffer = globalBytePoolCap.Get() + defer globalBytePoolCap.Put(buffer) case size < fi.Erasure.BlockSize: // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) @@ -1378,8 +1378,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st case size == 0: buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF case size >= fi.Erasure.BlockSize || size == -1: - buffer = er.bp.Get() - defer er.bp.Put(buffer) + buffer = globalBytePoolCap.Get() + defer globalBytePoolCap.Put(buffer) case size < fi.Erasure.BlockSize: // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) @@ -1438,10 +1438,10 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st toEncode := io.Reader(data) if data.Size() > bigFileThreshold { // We use 2 buffers, so we always have a full buffer of input. - bufA := er.bp.Get() - bufB := er.bp.Get() - defer er.bp.Put(bufA) - defer er.bp.Put(bufB) + bufA := globalBytePoolCap.Get() + bufB := globalBytePoolCap.Get() + defer globalBytePoolCap.Put(bufA) + defer globalBytePoolCap.Put(bufB) ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]}) if err == nil { toEncode = ra diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 6ee016514..c446b8e04 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -38,6 +38,7 @@ import ( "github.com/minio/minio-go/v7/pkg/s3utils" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" + "github.com/minio/minio/internal/bpool" "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/sync/errgroup" @@ -82,6 +83,21 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ } ) + // Maximum number of reusable buffers per node at any given point in time. + n := 1024 // single node single/multiple drives set this to 1024 entries + + if globalIsDistErasure { + n = 2048 + } + + // Initialize byte pool once for all sets, bpool size is set to + // setCount * setDriveCount with each memory upto blockSizeV2. + globalBytePoolCap = bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2) + + if globalServerCtxt.PreAllocate { + globalBytePoolCap.Populate() + } + var localDrives []StorageAPI local := endpointServerPools.FirstLocal() for i, ep := range endpointServerPools { diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 22ed3a7f4..97a860210 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -36,7 +36,6 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" - "github.com/minio/minio/internal/bpool" "github.com/minio/minio/internal/dsync" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/console" @@ -369,21 +368,6 @@ func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks [ mutex := newNSLock(globalIsDistErasure) - // Number of buffers, max 2GB - n := (2 * humanize.GiByte) / (blockSizeV2 * 2) - - // Initialize byte pool once for all sets, bpool size is set to - // setCount * setDriveCount with each memory upto blockSizeV2. - bp := bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2) - - // Initialize byte pool for all sets, bpool size is set to - // setCount * setDriveCount with each memory upto blockSizeV1 - // - // Number of buffers, max 10GiB - m := (10 * humanize.GiByte) / (blockSizeV1 * 2) - - bpOld := bpool.NewBytePoolCap(m, blockSizeV1, blockSizeV1*2) - for i := 0; i < setCount; i++ { s.erasureDisks[i] = make([]StorageAPI, setDriveCount) } @@ -459,8 +443,6 @@ func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks [ getLockers: s.GetLockers(i), getEndpoints: s.GetEndpoints(i), nsMutex: mutex, - bp: bp, - bpOld: bpOld, } }(i) } diff --git a/cmd/erasure.go b/cmd/erasure.go index b60bc44eb..6800392d2 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -29,7 +29,6 @@ import ( "time" "github.com/minio/madmin-go/v3" - "github.com/minio/minio/internal/bpool" "github.com/minio/minio/internal/dsync" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" @@ -65,13 +64,6 @@ type erasureObjects struct { // Locker mutex map. nsMutex *nsLockMap - - // Byte pools used for temporary i/o buffers. - bp *bpool.BytePoolCap - - // Byte pools used for temporary i/o buffers, - // legacy objects. - bpOld *bpool.BytePoolCap } // NewNSLock - initialize a new namespace RWLocker instance. diff --git a/cmd/globals.go b/cmd/globals.go index c2ac6e74d..3439c5122 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -30,6 +30,7 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/set" + "github.com/minio/minio/internal/bpool" "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/handlers" @@ -141,6 +142,7 @@ type serverCtxt struct { JSON, Quiet bool Anonymous bool StrictS3Compat bool + PreAllocate bool Addr, ConsoleAddr string ConfigDir, CertsDir string configDirSet, certsDirSet bool @@ -225,6 +227,7 @@ var ( globalBucketMonitor *bandwidth.Monitor globalPolicySys *PolicySys globalIAMSys *IAMSys + globalBytePoolCap *bpool.BytePoolCap globalLifecycleSys *LifecycleSys globalBucketSSEConfigSys *BucketSSEConfigSys diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index ce8d802de..be36895bb 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1982,20 +1982,21 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req md5hex = "" // Do not try to verify the content. sha256hex = "" } - var hashReader *hash.Reader - // Optimization: If SSE-KMS and SSE-C did not request Content-Md5. Use uuid as etag - if !etag.ContentMD5Requested(r.Header) && (crypto.S3KMS.IsRequested(r.Header) || crypto.SSEC.IsRequested(r.Header)) { - hashReader, err = hash.NewReaderWithOpts(ctx, reader, hash.Options{ - Size: size, - MD5Hex: md5hex, - SHA256Hex: sha256hex, - ActualSize: actualSize, - DisableMD5: false, - ForceMD5: mustGetUUIDBytes(), - }) - } else { - hashReader, err = hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize) + + var forceMD5 []byte + // Optimization: If SSE-KMS and SSE-C did not request Content-Md5. Use uuid as etag. Optionally enable this also + // for server that is started with `--no-compat`. + if !etag.ContentMD5Requested(r.Header) && (crypto.S3KMS.IsRequested(r.Header) || crypto.SSEC.IsRequested(r.Header) || !globalServerCtxt.StrictS3Compat) { + forceMD5 = mustGetUUIDBytes() } + hashReader, err := hash.NewReaderWithOpts(ctx, reader, hash.Options{ + Size: size, + MD5Hex: md5hex, + SHA256Hex: sha256hex, + ActualSize: actualSize, + DisableMD5: false, + ForceMD5: forceMD5, + }) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index 9e52c828f..d27123b59 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -728,7 +728,14 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http sha256hex = "" } - hashReader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize) + hashReader, err := hash.NewReaderWithOpts(ctx, reader, hash.Options{ + Size: size, + MD5Hex: md5hex, + SHA256Hex: sha256hex, + ActualSize: actualSize, + DisableMD5: false, + ForceMD5: nil, + }) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return diff --git a/cmd/server-main.go b/cmd/server-main.go index c9bd20493..d49cb7742 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -74,6 +74,12 @@ var ServerFlags = []cli.Flag{ EnvVar: "MINIO_LISTENERS", Hidden: true, }, + cli.BoolFlag{ + Name: "pre-allocate", + Usage: "Number of 1MiB sized buffers to pre-allocate. Default 2048", + EnvVar: "MINIO_PRE_ALLOCATE", + Hidden: true, + }, cli.StringFlag{ Name: "console-address", Usage: "bind to a specific ADDRESS:PORT for embedded Console UI, ADDRESS can be an IP or hostname", diff --git a/internal/bpool/bpool.go b/internal/bpool/bpool.go index 4e6621884..4f69f7f7a 100644 --- a/internal/bpool/bpool.go +++ b/internal/bpool/bpool.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -17,6 +17,8 @@ package bpool +import "github.com/klauspost/reedsolomon" + // BytePoolCap implements a leaky pool of []byte in the form of a bounded channel. type BytePoolCap struct { c chan []byte @@ -27,6 +29,12 @@ type BytePoolCap struct { // NewBytePoolCap creates a new BytePool bounded to the given maxSize, with new // byte arrays sized based on width. func NewBytePoolCap(maxSize int, width int, capwidth int) (bp *BytePoolCap) { + if capwidth > 0 && capwidth < 64 { + panic("buffer capped with smaller than 64 bytes is not supported") + } + if capwidth > 0 && width > capwidth { + panic("buffer length cannot be > capacity of the buffer") + } return &BytePoolCap{ c: make(chan []byte, maxSize), w: width, @@ -34,18 +42,25 @@ func NewBytePoolCap(maxSize int, width int, capwidth int) (bp *BytePoolCap) { } } +// Populate - populates and pre-warms the byte pool, this function is non-blocking. +func (bp *BytePoolCap) Populate() { + for _, buf := range reedsolomon.AllocAligned(cap(bp.c), bp.wcap) { + bp.Put(buf[:bp.w]) + } +} + // Get gets a []byte from the BytePool, or creates a new one if none are // available in the pool. func (bp *BytePoolCap) Get() (b []byte) { select { case b = <-bp.c: - // reuse existing buffer + // reuse existing buffer default: - // create new buffer + // create new aligned buffer if bp.wcap > 0 { - b = make([]byte, bp.w, bp.wcap) + b = reedsolomon.AllocAligned(1, bp.wcap)[0][:bp.w] } else { - b = make([]byte, bp.w) + b = reedsolomon.AllocAligned(1, bp.w)[0] } } return diff --git a/internal/bpool/bpool_test.go b/internal/bpool/bpool_test.go index 2525cdb70..28fee5993 100644 --- a/internal/bpool/bpool_test.go +++ b/internal/bpool/bpool_test.go @@ -22,8 +22,8 @@ import "testing" // Tests - bytePool functionality. func TestBytePool(t *testing.T) { size := 4 - width := 10 - capWidth := 16 + width := 1024 + capWidth := 2048 bufPool := NewBytePoolCap(size, width, capWidth) @@ -43,7 +43,7 @@ func TestBytePool(t *testing.T) { t.Fatalf("bytepool length invalid: got %v want %v", len(b), width) } if cap(b) != capWidth { - t.Fatalf("bytepool length invalid: got %v want %v", cap(b), capWidth) + t.Fatalf("bytepool cap invalid: got %v want %v", cap(b), capWidth) } bufPool.Put(b)