mirror of https://github.com/minio/minio.git
allow pre-allocating buffers to reduce frequent GCs during growth (#18686)
This PR also increases per node bpool memory from 1024 entries to 2048 entries; along with that, it also moves the byte pool centrally instead of being per pool.
This commit is contained in:
parent
56b7045c20
commit
7c948adf88
|
@ -355,10 +355,8 @@ func buildServerCtxt(ctx *cli.Context, ctxt *serverCtxt) (err error) {
|
|||
}
|
||||
|
||||
// Check "no-compat" flag from command line argument.
|
||||
ctxt.StrictS3Compat = true
|
||||
if ctx.IsSet("no-compat") || ctx.GlobalIsSet("no-compat") {
|
||||
ctxt.StrictS3Compat = false
|
||||
}
|
||||
ctxt.StrictS3Compat = !(ctx.IsSet("no-compat") || ctx.GlobalIsSet("no-compat"))
|
||||
ctxt.PreAllocate = ctx.IsSet("pre-allocate") || ctx.GlobalIsSet("pre-allocate")
|
||||
|
||||
switch {
|
||||
case ctx.IsSet("config-dir"):
|
||||
|
|
|
@ -662,12 +662,12 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
// Account for padding and forced compression overhead and encryption.
|
||||
buffer = make([]byte, data.ActualSize()+256+32+32, data.ActualSize()*2+512)
|
||||
} else {
|
||||
buffer = er.bp.Get()
|
||||
defer er.bp.Put(buffer)
|
||||
buffer = globalBytePoolCap.Get()
|
||||
defer globalBytePoolCap.Put(buffer)
|
||||
}
|
||||
case size >= fi.Erasure.BlockSize:
|
||||
buffer = er.bp.Get()
|
||||
defer er.bp.Put(buffer)
|
||||
buffer = globalBytePoolCap.Get()
|
||||
defer globalBytePoolCap.Put(buffer)
|
||||
case size < fi.Erasure.BlockSize:
|
||||
// No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smaller.
|
||||
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
|
||||
|
@ -688,10 +688,10 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
if data.Size() > bigFileThreshold {
|
||||
// Add input readahead.
|
||||
// We use 2 buffers, so we always have a full buffer of input.
|
||||
bufA := er.bp.Get()
|
||||
bufB := er.bp.Get()
|
||||
defer er.bp.Put(bufA)
|
||||
defer er.bp.Put(bufB)
|
||||
bufA := globalBytePoolCap.Get()
|
||||
bufB := globalBytePoolCap.Get()
|
||||
defer globalBytePoolCap.Put(bufA)
|
||||
defer globalBytePoolCap.Put(bufB)
|
||||
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
|
||||
if err == nil {
|
||||
toEncode = ra
|
||||
|
|
|
@ -1127,8 +1127,8 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
|
|||
case size == 0:
|
||||
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
|
||||
case size >= fi.Erasure.BlockSize:
|
||||
buffer = er.bp.Get()
|
||||
defer er.bp.Put(buffer)
|
||||
buffer = globalBytePoolCap.Get()
|
||||
defer globalBytePoolCap.Put(buffer)
|
||||
case size < fi.Erasure.BlockSize:
|
||||
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
|
||||
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
|
||||
|
@ -1378,8 +1378,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||
case size == 0:
|
||||
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
|
||||
case size >= fi.Erasure.BlockSize || size == -1:
|
||||
buffer = er.bp.Get()
|
||||
defer er.bp.Put(buffer)
|
||||
buffer = globalBytePoolCap.Get()
|
||||
defer globalBytePoolCap.Put(buffer)
|
||||
case size < fi.Erasure.BlockSize:
|
||||
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
|
||||
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
|
||||
|
@ -1438,10 +1438,10 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||
toEncode := io.Reader(data)
|
||||
if data.Size() > bigFileThreshold {
|
||||
// We use 2 buffers, so we always have a full buffer of input.
|
||||
bufA := er.bp.Get()
|
||||
bufB := er.bp.Get()
|
||||
defer er.bp.Put(bufA)
|
||||
defer er.bp.Put(bufB)
|
||||
bufA := globalBytePoolCap.Get()
|
||||
bufB := globalBytePoolCap.Get()
|
||||
defer globalBytePoolCap.Put(bufA)
|
||||
defer globalBytePoolCap.Put(bufB)
|
||||
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
|
||||
if err == nil {
|
||||
toEncode = ra
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"github.com/minio/minio-go/v7/pkg/s3utils"
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
"github.com/minio/minio-go/v7/pkg/tags"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/minio/minio/internal/config/storageclass"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/v2/sync/errgroup"
|
||||
|
@ -82,6 +83,21 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
|
|||
}
|
||||
)
|
||||
|
||||
// Maximum number of reusable buffers per node at any given point in time.
|
||||
n := 1024 // single node single/multiple drives set this to 1024 entries
|
||||
|
||||
if globalIsDistErasure {
|
||||
n = 2048
|
||||
}
|
||||
|
||||
// Initialize byte pool once for all sets, bpool size is set to
|
||||
// setCount * setDriveCount with each memory upto blockSizeV2.
|
||||
globalBytePoolCap = bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2)
|
||||
|
||||
if globalServerCtxt.PreAllocate {
|
||||
globalBytePoolCap.Populate()
|
||||
}
|
||||
|
||||
var localDrives []StorageAPI
|
||||
local := endpointServerPools.FirstLocal()
|
||||
for i, ep := range endpointServerPools {
|
||||
|
|
|
@ -36,7 +36,6 @@ import (
|
|||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
"github.com/minio/minio-go/v7/pkg/tags"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/minio/minio/internal/dsync"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/v2/console"
|
||||
|
@ -369,21 +368,6 @@ func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks [
|
|||
|
||||
mutex := newNSLock(globalIsDistErasure)
|
||||
|
||||
// Number of buffers, max 2GB
|
||||
n := (2 * humanize.GiByte) / (blockSizeV2 * 2)
|
||||
|
||||
// Initialize byte pool once for all sets, bpool size is set to
|
||||
// setCount * setDriveCount with each memory upto blockSizeV2.
|
||||
bp := bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2)
|
||||
|
||||
// Initialize byte pool for all sets, bpool size is set to
|
||||
// setCount * setDriveCount with each memory upto blockSizeV1
|
||||
//
|
||||
// Number of buffers, max 10GiB
|
||||
m := (10 * humanize.GiByte) / (blockSizeV1 * 2)
|
||||
|
||||
bpOld := bpool.NewBytePoolCap(m, blockSizeV1, blockSizeV1*2)
|
||||
|
||||
for i := 0; i < setCount; i++ {
|
||||
s.erasureDisks[i] = make([]StorageAPI, setDriveCount)
|
||||
}
|
||||
|
@ -459,8 +443,6 @@ func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks [
|
|||
getLockers: s.GetLockers(i),
|
||||
getEndpoints: s.GetEndpoints(i),
|
||||
nsMutex: mutex,
|
||||
bp: bp,
|
||||
bpOld: bpOld,
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/minio/minio/internal/dsync"
|
||||
xioutil "github.com/minio/minio/internal/ioutil"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
|
@ -65,13 +64,6 @@ type erasureObjects struct {
|
|||
|
||||
// Locker mutex map.
|
||||
nsMutex *nsLockMap
|
||||
|
||||
// Byte pools used for temporary i/o buffers.
|
||||
bp *bpool.BytePoolCap
|
||||
|
||||
// Byte pools used for temporary i/o buffers,
|
||||
// legacy objects.
|
||||
bpOld *bpool.BytePoolCap
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/minio/minio/internal/bucket/bandwidth"
|
||||
"github.com/minio/minio/internal/config"
|
||||
"github.com/minio/minio/internal/handlers"
|
||||
|
@ -141,6 +142,7 @@ type serverCtxt struct {
|
|||
JSON, Quiet bool
|
||||
Anonymous bool
|
||||
StrictS3Compat bool
|
||||
PreAllocate bool
|
||||
Addr, ConsoleAddr string
|
||||
ConfigDir, CertsDir string
|
||||
configDirSet, certsDirSet bool
|
||||
|
@ -225,6 +227,7 @@ var (
|
|||
globalBucketMonitor *bandwidth.Monitor
|
||||
globalPolicySys *PolicySys
|
||||
globalIAMSys *IAMSys
|
||||
globalBytePoolCap *bpool.BytePoolCap
|
||||
|
||||
globalLifecycleSys *LifecycleSys
|
||||
globalBucketSSEConfigSys *BucketSSEConfigSys
|
||||
|
|
|
@ -1982,20 +1982,21 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
|||
md5hex = "" // Do not try to verify the content.
|
||||
sha256hex = ""
|
||||
}
|
||||
var hashReader *hash.Reader
|
||||
// Optimization: If SSE-KMS and SSE-C did not request Content-Md5. Use uuid as etag
|
||||
if !etag.ContentMD5Requested(r.Header) && (crypto.S3KMS.IsRequested(r.Header) || crypto.SSEC.IsRequested(r.Header)) {
|
||||
hashReader, err = hash.NewReaderWithOpts(ctx, reader, hash.Options{
|
||||
Size: size,
|
||||
MD5Hex: md5hex,
|
||||
SHA256Hex: sha256hex,
|
||||
ActualSize: actualSize,
|
||||
DisableMD5: false,
|
||||
ForceMD5: mustGetUUIDBytes(),
|
||||
})
|
||||
} else {
|
||||
hashReader, err = hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize)
|
||||
|
||||
var forceMD5 []byte
|
||||
// Optimization: If SSE-KMS and SSE-C did not request Content-Md5. Use uuid as etag. Optionally enable this also
|
||||
// for server that is started with `--no-compat`.
|
||||
if !etag.ContentMD5Requested(r.Header) && (crypto.S3KMS.IsRequested(r.Header) || crypto.SSEC.IsRequested(r.Header) || !globalServerCtxt.StrictS3Compat) {
|
||||
forceMD5 = mustGetUUIDBytes()
|
||||
}
|
||||
hashReader, err := hash.NewReaderWithOpts(ctx, reader, hash.Options{
|
||||
Size: size,
|
||||
MD5Hex: md5hex,
|
||||
SHA256Hex: sha256hex,
|
||||
ActualSize: actualSize,
|
||||
DisableMD5: false,
|
||||
ForceMD5: forceMD5,
|
||||
})
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
|
|
|
@ -728,7 +728,14 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
|||
sha256hex = ""
|
||||
}
|
||||
|
||||
hashReader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize)
|
||||
hashReader, err := hash.NewReaderWithOpts(ctx, reader, hash.Options{
|
||||
Size: size,
|
||||
MD5Hex: md5hex,
|
||||
SHA256Hex: sha256hex,
|
||||
ActualSize: actualSize,
|
||||
DisableMD5: false,
|
||||
ForceMD5: nil,
|
||||
})
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
|
|
|
@ -74,6 +74,12 @@ var ServerFlags = []cli.Flag{
|
|||
EnvVar: "MINIO_LISTENERS",
|
||||
Hidden: true,
|
||||
},
|
||||
cli.BoolFlag{
|
||||
Name: "pre-allocate",
|
||||
Usage: "Number of 1MiB sized buffers to pre-allocate. Default 2048",
|
||||
EnvVar: "MINIO_PRE_ALLOCATE",
|
||||
Hidden: true,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "console-address",
|
||||
Usage: "bind to a specific ADDRESS:PORT for embedded Console UI, ADDRESS can be an IP or hostname",
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
// Copyright (c) 2015-2023 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
|
@ -17,6 +17,8 @@
|
|||
|
||||
package bpool
|
||||
|
||||
import "github.com/klauspost/reedsolomon"
|
||||
|
||||
// BytePoolCap implements a leaky pool of []byte in the form of a bounded channel.
|
||||
type BytePoolCap struct {
|
||||
c chan []byte
|
||||
|
@ -27,6 +29,12 @@ type BytePoolCap struct {
|
|||
// NewBytePoolCap creates a new BytePool bounded to the given maxSize, with new
|
||||
// byte arrays sized based on width.
|
||||
func NewBytePoolCap(maxSize int, width int, capwidth int) (bp *BytePoolCap) {
|
||||
if capwidth > 0 && capwidth < 64 {
|
||||
panic("buffer capped with smaller than 64 bytes is not supported")
|
||||
}
|
||||
if capwidth > 0 && width > capwidth {
|
||||
panic("buffer length cannot be > capacity of the buffer")
|
||||
}
|
||||
return &BytePoolCap{
|
||||
c: make(chan []byte, maxSize),
|
||||
w: width,
|
||||
|
@ -34,18 +42,25 @@ func NewBytePoolCap(maxSize int, width int, capwidth int) (bp *BytePoolCap) {
|
|||
}
|
||||
}
|
||||
|
||||
// Populate - populates and pre-warms the byte pool, this function is non-blocking.
|
||||
func (bp *BytePoolCap) Populate() {
|
||||
for _, buf := range reedsolomon.AllocAligned(cap(bp.c), bp.wcap) {
|
||||
bp.Put(buf[:bp.w])
|
||||
}
|
||||
}
|
||||
|
||||
// Get gets a []byte from the BytePool, or creates a new one if none are
|
||||
// available in the pool.
|
||||
func (bp *BytePoolCap) Get() (b []byte) {
|
||||
select {
|
||||
case b = <-bp.c:
|
||||
// reuse existing buffer
|
||||
// reuse existing buffer
|
||||
default:
|
||||
// create new buffer
|
||||
// create new aligned buffer
|
||||
if bp.wcap > 0 {
|
||||
b = make([]byte, bp.w, bp.wcap)
|
||||
b = reedsolomon.AllocAligned(1, bp.wcap)[0][:bp.w]
|
||||
} else {
|
||||
b = make([]byte, bp.w)
|
||||
b = reedsolomon.AllocAligned(1, bp.w)[0]
|
||||
}
|
||||
}
|
||||
return
|
||||
|
|
|
@ -22,8 +22,8 @@ import "testing"
|
|||
// Tests - bytePool functionality.
|
||||
func TestBytePool(t *testing.T) {
|
||||
size := 4
|
||||
width := 10
|
||||
capWidth := 16
|
||||
width := 1024
|
||||
capWidth := 2048
|
||||
|
||||
bufPool := NewBytePoolCap(size, width, capWidth)
|
||||
|
||||
|
@ -43,7 +43,7 @@ func TestBytePool(t *testing.T) {
|
|||
t.Fatalf("bytepool length invalid: got %v want %v", len(b), width)
|
||||
}
|
||||
if cap(b) != capWidth {
|
||||
t.Fatalf("bytepool length invalid: got %v want %v", cap(b), capWidth)
|
||||
t.Fatalf("bytepool cap invalid: got %v want %v", cap(b), capWidth)
|
||||
}
|
||||
|
||||
bufPool.Put(b)
|
||||
|
|
Loading…
Reference in New Issue