mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
reduce allocations on multi-disk clusters (#12311)
multi-disk clusters initialize buffer pools per disk, this is perhaps expensive and perhaps not useful, for a running server instance. As this may disallow re-use of buffers across sets, this change ensures that buffers across sets can be re-used at drive level, this can reduce quite a lot of memory on large drive setups.
This commit is contained in:
parent
d610578d84
commit
2daba018d6
@ -164,7 +164,6 @@ func bitrotVerify(r io.Reader, wantSize, partSize int64, algo BitrotAlgorithm, w
|
|||||||
|
|
||||||
h := algo.New()
|
h := algo.New()
|
||||||
hashBuf := make([]byte, h.Size())
|
hashBuf := make([]byte, h.Size())
|
||||||
buf := make([]byte, shardSize)
|
|
||||||
left := wantSize
|
left := wantSize
|
||||||
|
|
||||||
// Calculate the size of the bitrot file and compare
|
// Calculate the size of the bitrot file and compare
|
||||||
@ -173,6 +172,9 @@ func bitrotVerify(r io.Reader, wantSize, partSize int64, algo BitrotAlgorithm, w
|
|||||||
return errFileCorrupt
|
return errFileCorrupt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bufp := xlPoolSmall.Get().(*[]byte)
|
||||||
|
defer xlPoolSmall.Put(bufp)
|
||||||
|
|
||||||
for left > 0 {
|
for left > 0 {
|
||||||
// Read expected hash...
|
// Read expected hash...
|
||||||
h.Reset()
|
h.Reset()
|
||||||
@ -186,13 +188,15 @@ func bitrotVerify(r io.Reader, wantSize, partSize int64, algo BitrotAlgorithm, w
|
|||||||
if left < shardSize {
|
if left < shardSize {
|
||||||
shardSize = left
|
shardSize = left
|
||||||
}
|
}
|
||||||
read, err := io.CopyBuffer(h, io.LimitReader(r, shardSize), buf)
|
|
||||||
|
read, err := io.CopyBuffer(h, io.LimitReader(r, shardSize), *bufp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Read's failed for object with right size, at different offsets.
|
// Read's failed for object with right size, at different offsets.
|
||||||
return err
|
return errFileCorrupt
|
||||||
}
|
}
|
||||||
|
|
||||||
left -= read
|
left -= read
|
||||||
if !bytes.Equal(h.Sum(nil), hashBuf) {
|
if !bytes.Equal(h.Sum(nil), hashBuf[:n]) {
|
||||||
return errFileCorrupt
|
return errFileCorrupt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -261,7 +261,7 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
|||||||
// Always check data, if we got it.
|
// Always check data, if we got it.
|
||||||
if (len(meta.Data) > 0 || meta.Size == 0) && len(meta.Parts) > 0 {
|
if (len(meta.Data) > 0 || meta.Size == 0) && len(meta.Parts) > 0 {
|
||||||
checksumInfo := meta.Erasure.GetChecksumInfo(meta.Parts[0].Number)
|
checksumInfo := meta.Erasure.GetChecksumInfo(meta.Parts[0].Number)
|
||||||
dataErrs[i] = bitrotVerify(bytes.NewBuffer(meta.Data),
|
dataErrs[i] = bitrotVerify(bytes.NewReader(meta.Data),
|
||||||
int64(len(meta.Data)),
|
int64(len(meta.Data)),
|
||||||
meta.Erasure.ShardFileSize(meta.Size),
|
meta.Erasure.ShardFileSize(meta.Size),
|
||||||
checksumInfo.Algorithm,
|
checksumInfo.Algorithm,
|
||||||
|
@ -98,6 +98,27 @@ func isValidVolname(volname string) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
xlPoolReallyLarge = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
b := disk.AlignedBlock(blockSizeReallyLarge)
|
||||||
|
return &b
|
||||||
|
},
|
||||||
|
}
|
||||||
|
xlPoolLarge = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
b := disk.AlignedBlock(blockSizeLarge)
|
||||||
|
return &b
|
||||||
|
},
|
||||||
|
}
|
||||||
|
xlPoolSmall = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
b := disk.AlignedBlock(blockSizeSmall)
|
||||||
|
return &b
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// xlStorage - implements StorageAPI interface.
|
// xlStorage - implements StorageAPI interface.
|
||||||
type xlStorage struct {
|
type xlStorage struct {
|
||||||
diskPath string
|
diskPath string
|
||||||
@ -105,10 +126,6 @@ type xlStorage struct {
|
|||||||
|
|
||||||
globalSync bool
|
globalSync bool
|
||||||
|
|
||||||
poolReallyLarge sync.Pool
|
|
||||||
poolLarge sync.Pool
|
|
||||||
poolSmall sync.Pool
|
|
||||||
|
|
||||||
rootDisk bool
|
rootDisk bool
|
||||||
|
|
||||||
diskID string
|
diskID string
|
||||||
@ -254,26 +271,8 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
p := &xlStorage{
|
p := &xlStorage{
|
||||||
diskPath: path,
|
diskPath: path,
|
||||||
endpoint: ep,
|
endpoint: ep,
|
||||||
poolReallyLarge: sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
b := disk.AlignedBlock(blockSizeReallyLarge)
|
|
||||||
return &b
|
|
||||||
},
|
|
||||||
},
|
|
||||||
poolLarge: sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
b := disk.AlignedBlock(blockSizeLarge)
|
|
||||||
return &b
|
|
||||||
},
|
|
||||||
},
|
|
||||||
poolSmall: sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
b := disk.AlignedBlock(blockSizeSmall)
|
|
||||||
return &b
|
|
||||||
},
|
|
||||||
},
|
|
||||||
globalSync: env.Get(config.EnvFSOSync, config.EnableOff) == config.EnableOn,
|
globalSync: env.Get(config.EnvFSOSync, config.EnableOff) == config.EnableOn,
|
||||||
ctx: GlobalContext,
|
ctx: GlobalContext,
|
||||||
rootDisk: rootDisk,
|
rootDisk: rootDisk,
|
||||||
@ -1323,9 +1322,9 @@ func (o *odirectReader) Read(buf []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
if o.buf == nil {
|
if o.buf == nil {
|
||||||
if o.smallFile {
|
if o.smallFile {
|
||||||
o.bufp = o.s.poolSmall.Get().(*[]byte)
|
o.bufp = xlPoolSmall.Get().(*[]byte)
|
||||||
} else {
|
} else {
|
||||||
o.bufp = o.s.poolLarge.Get().(*[]byte)
|
o.bufp = xlPoolLarge.Get().(*[]byte)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if o.freshRead {
|
if o.freshRead {
|
||||||
@ -1367,9 +1366,9 @@ func (o *odirectReader) Read(buf []byte) (n int, err error) {
|
|||||||
// Close - Release the buffer and close the file.
|
// Close - Release the buffer and close the file.
|
||||||
func (o *odirectReader) Close() error {
|
func (o *odirectReader) Close() error {
|
||||||
if o.smallFile {
|
if o.smallFile {
|
||||||
o.s.poolSmall.Put(o.bufp)
|
xlPoolSmall.Put(o.bufp)
|
||||||
} else {
|
} else {
|
||||||
o.s.poolLarge.Put(o.bufp)
|
xlPoolLarge.Put(o.bufp)
|
||||||
}
|
}
|
||||||
return o.f.Close()
|
return o.f.Close()
|
||||||
}
|
}
|
||||||
@ -1554,11 +1553,11 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
|
|||||||
var bufp *[]byte
|
var bufp *[]byte
|
||||||
if fileSize > 0 && fileSize >= reallyLargeFileThreshold {
|
if fileSize > 0 && fileSize >= reallyLargeFileThreshold {
|
||||||
// use a larger 4MiB buffer for really large streams.
|
// use a larger 4MiB buffer for really large streams.
|
||||||
bufp = s.poolReallyLarge.Get().(*[]byte)
|
bufp = xlPoolReallyLarge.Get().(*[]byte)
|
||||||
defer s.poolReallyLarge.Put(bufp)
|
defer xlPoolReallyLarge.Put(bufp)
|
||||||
} else {
|
} else {
|
||||||
bufp = s.poolLarge.Get().(*[]byte)
|
bufp = xlPoolLarge.Get().(*[]byte)
|
||||||
defer s.poolLarge.Put(bufp)
|
defer xlPoolLarge.Put(bufp)
|
||||||
}
|
}
|
||||||
|
|
||||||
written, err := xioutil.CopyAligned(w, r, *bufp, fileSize)
|
written, err := xioutil.CopyAligned(w, r, *bufp, fileSize)
|
||||||
|
Loading…
Reference in New Issue
Block a user