mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
optimize memory allocation during erasure-read by using temporary buffer pool. (#2259)
* XL/erasure-read: optimize memory allocation during erasure-read by using temporary buffer pool. With the change the buffer needed during GetObject by erasureReadFile is allocated only once.
This commit is contained in:
parent
04f90bd463
commit
043ddbd834
@ -17,13 +17,13 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/klauspost/reedsolomon"
|
"github.com/klauspost/reedsolomon"
|
||||||
|
"github.com/minio/minio/pkg/bpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// isSuccessDecodeBlocks - do we have all the blocks to be
|
// isSuccessDecodeBlocks - do we have all the blocks to be
|
||||||
@ -112,7 +112,7 @@ func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDis
|
|||||||
}
|
}
|
||||||
|
|
||||||
// parallelRead - reads chunks in parallel from the disks specified in []readDisks.
|
// parallelRead - reads chunks in parallel from the disks specified in []readDisks.
|
||||||
func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []StorageAPI, enBlocks [][]byte, blockOffset int64, curChunkSize int64, bitRotVerify func(diskIndex int) bool) {
|
func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []StorageAPI, enBlocks [][]byte, blockOffset int64, curChunkSize int64, bitRotVerify func(diskIndex int) bool, pool *bpool.BytePool) {
|
||||||
// WaitGroup to synchronise the read go-routines.
|
// WaitGroup to synchronise the read go-routines.
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
@ -133,21 +133,20 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chunk writer.
|
buf, err := pool.Get()
|
||||||
chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize))
|
|
||||||
|
|
||||||
// CopyN - copies until current chunk size.
|
|
||||||
err := copyN(chunkWriter, readDisks[index], volume, path, blockOffset, curChunkSize)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// So that we don't read from this disk for the next block.
|
errorIf(err, "unable to get buffer from byte pool")
|
||||||
orderedDisks[index] = nil
|
orderedDisks[index] = nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
buf = buf[:curChunkSize]
|
||||||
|
|
||||||
// Copy the read blocks.
|
_, err = readDisks[index].ReadFile(volume, path, blockOffset, buf)
|
||||||
enBlocks[index] = chunkWriter.Bytes()
|
if err != nil {
|
||||||
|
orderedDisks[index] = nil
|
||||||
// Successfully read.
|
return
|
||||||
|
}
|
||||||
|
enBlocks[index] = buf
|
||||||
}(index)
|
}(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,7 +159,7 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St
|
|||||||
// are decoded into a data block. Data block is trimmed for given offset and length,
|
// are decoded into a data block. Data block is trimmed for given offset and length,
|
||||||
// then written to given writer. This function also supports bit-rot detection by
|
// then written to given writer. This function also supports bit-rot detection by
|
||||||
// verifying checksum of individual block's checksum.
|
// verifying checksum of individual block's checksum.
|
||||||
func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string) (int64, error) {
|
func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string, pool *bpool.BytePool) (int64, error) {
|
||||||
// Offset and length cannot be negative.
|
// Offset and length cannot be negative.
|
||||||
if offset < 0 || length < 0 {
|
if offset < 0 || length < 0 {
|
||||||
return 0, errUnexpected
|
return 0, errUnexpected
|
||||||
@ -171,6 +170,9 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
|||||||
return 0, errUnexpected
|
return 0, errUnexpected
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// chunkSize is the amount of data that needs to be read from each disk at a time.
|
||||||
|
chunkSize := getChunkSize(blockSize, dataBlocks)
|
||||||
|
|
||||||
// bitRotVerify verifies if the file on a particular disk doesn't have bitrot
|
// bitRotVerify verifies if the file on a particular disk doesn't have bitrot
|
||||||
// by verifying the hash of the contents of the file.
|
// by verifying the hash of the contents of the file.
|
||||||
bitRotVerify := func() func(diskIndex int) bool {
|
bitRotVerify := func() func(diskIndex int) bool {
|
||||||
@ -193,9 +195,6 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
|||||||
// Total bytes written to writer
|
// Total bytes written to writer
|
||||||
bytesWritten := int64(0)
|
bytesWritten := int64(0)
|
||||||
|
|
||||||
// chunkSize is the amount of data that needs to be read from each disk at a time.
|
|
||||||
chunkSize := getChunkSize(blockSize, dataBlocks)
|
|
||||||
|
|
||||||
startBlock := offset / blockSize
|
startBlock := offset / blockSize
|
||||||
endBlock := (offset + length) / blockSize
|
endBlock := (offset + length) / blockSize
|
||||||
|
|
||||||
@ -209,6 +208,10 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
|||||||
// need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number
|
// need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number
|
||||||
// of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer.
|
// of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer.
|
||||||
for block := startBlock; block <= endBlock; block++ {
|
for block := startBlock; block <= endBlock; block++ {
|
||||||
|
// Mark all buffers as unused at the start of the loop so that the buffers
|
||||||
|
// can be reused.
|
||||||
|
pool.Reset()
|
||||||
|
|
||||||
// Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk.
|
// Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk.
|
||||||
enBlocks := make([][]byte, len(disks))
|
enBlocks := make([][]byte, len(disks))
|
||||||
|
|
||||||
@ -239,7 +242,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
|||||||
return bytesWritten, err
|
return bytesWritten, err
|
||||||
}
|
}
|
||||||
// Issue a parallel read across the disks specified in readDisks.
|
// Issue a parallel read across the disks specified in readDisks.
|
||||||
parallelRead(volume, path, readDisks, disks, enBlocks, blockOffset, curChunkSize, bitRotVerify)
|
parallelRead(volume, path, readDisks, disks, enBlocks, blockOffset, curChunkSize, bitRotVerify, pool)
|
||||||
if isSuccessDecodeBlocks(enBlocks, dataBlocks) {
|
if isSuccessDecodeBlocks(enBlocks, dataBlocks) {
|
||||||
// If enough blocks are available to do rs.Reconstruct()
|
// If enough blocks are available to do rs.Reconstruct()
|
||||||
break
|
break
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio/minio/pkg/bpool"
|
||||||
)
|
)
|
||||||
import "reflect"
|
import "reflect"
|
||||||
|
|
||||||
@ -249,8 +251,13 @@ func TestErasureReadFileDiskFail(t *testing.T) {
|
|||||||
t.Errorf("erasureCreateFile returned %d, expected %d", size, length)
|
t.Errorf("erasureCreateFile returned %d, expected %d", size, length)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create byte pool which will be used by erasureReadFile for
|
||||||
|
// reading from disks and erasure decoding.
|
||||||
|
chunkSize := getChunkSize(blockSize, dataBlocks)
|
||||||
|
pool := bpool.NewBytePool(chunkSize, len(disks))
|
||||||
|
|
||||||
buf := &bytes.Buffer{}
|
buf := &bytes.Buffer{}
|
||||||
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums)
|
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -263,7 +270,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
|
|||||||
disks[5] = ReadDiskDown{disks[5].(*posix)}
|
disks[5] = ReadDiskDown{disks[5].(*posix)}
|
||||||
|
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums)
|
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -278,7 +285,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
|
|||||||
disks[11] = ReadDiskDown{disks[11].(*posix)}
|
disks[11] = ReadDiskDown{disks[11].(*posix)}
|
||||||
|
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums)
|
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -289,7 +296,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
|
|||||||
// 1 more disk down. 7 disks down in total. Read should fail.
|
// 1 more disk down. 7 disks down in total. Read should fail.
|
||||||
disks[12] = ReadDiskDown{disks[12].(*posix)}
|
disks[12] = ReadDiskDown{disks[12].(*posix)}
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums)
|
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool)
|
||||||
if err != errXLReadQuorum {
|
if err != errXLReadQuorum {
|
||||||
t.Fatal("expected errXLReadQuorum error")
|
t.Fatal("expected errXLReadQuorum error")
|
||||||
}
|
}
|
||||||
@ -347,11 +354,14 @@ func TestErasureReadFileOffsetLength(t *testing.T) {
|
|||||||
{length - blockSize - 1, blockSize},
|
{length - blockSize - 1, blockSize},
|
||||||
{length - blockSize - 1, blockSize + 1},
|
{length - blockSize - 1, blockSize + 1},
|
||||||
}
|
}
|
||||||
|
chunkSize := getChunkSize(blockSize, dataBlocks)
|
||||||
|
pool := bpool.NewBytePool(chunkSize, len(disks))
|
||||||
|
|
||||||
// Compare the data read from file with "data" byte array.
|
// Compare the data read from file with "data" byte array.
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
expected := data[testCase.offset:(testCase.offset + testCase.length)]
|
expected := data[testCase.offset:(testCase.offset + testCase.length)]
|
||||||
buf := &bytes.Buffer{}
|
buf := &bytes.Buffer{}
|
||||||
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", testCase.offset, testCase.length, length, blockSize, dataBlocks, parityBlocks, checkSums)
|
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", testCase.offset, testCase.length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
continue
|
continue
|
||||||
@ -405,6 +415,11 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) {
|
|||||||
// To generate random offset/length.
|
// To generate random offset/length.
|
||||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
|
// create pool buffer which will be used by erasureReadFile for
|
||||||
|
// reading from disks and erasure decoding.
|
||||||
|
chunkSize := getChunkSize(blockSize, dataBlocks)
|
||||||
|
pool := bpool.NewBytePool(chunkSize, len(disks))
|
||||||
|
|
||||||
buf := &bytes.Buffer{}
|
buf := &bytes.Buffer{}
|
||||||
|
|
||||||
// Verify erasureReadFile() for random offsets and lengths.
|
// Verify erasureReadFile() for random offsets and lengths.
|
||||||
@ -414,7 +429,7 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) {
|
|||||||
|
|
||||||
expected := data[offset : offset+readLen]
|
expected := data[offset : offset+readLen]
|
||||||
|
|
||||||
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", offset, readLen, length, blockSize, dataBlocks, parityBlocks, checkSums)
|
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", offset, readLen, length, blockSize, dataBlocks, parityBlocks, checkSums, pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err, offset, readLen)
|
t.Fatal(err, offset, readLen)
|
||||||
}
|
}
|
||||||
|
69
pkg/bpool/bpool.go
Normal file
69
pkg/bpool/bpool.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
/*
|
||||||
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Package bpool implements a fixed size pool of byte slices.
|
||||||
|
package bpool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrBpoolNoFree - Normally this error should never be returned, this error
|
||||||
|
// indicates a bug in the package consumer.
|
||||||
|
var ErrBpoolNoFree = errors.New("no free byte slice in pool")
|
||||||
|
|
||||||
|
// BytePool - temporary pool of byte slices.
|
||||||
|
type BytePool struct {
|
||||||
|
buf [][]byte // array of byte slices
|
||||||
|
used []bool // indicates if a buf[i] is in use
|
||||||
|
size int64 // size of buf[i]
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get - Returns an unused byte slice.
|
||||||
|
func (b *BytePool) Get() (buf []byte, err error) {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
for i := 0; i < len(b.used); i++ {
|
||||||
|
if !b.used[i] {
|
||||||
|
b.used[i] = true
|
||||||
|
if b.buf[i] == nil {
|
||||||
|
b.buf[i] = make([]byte, b.size)
|
||||||
|
}
|
||||||
|
return b.buf[i], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, ErrBpoolNoFree
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset - Marks all slices as unused.
|
||||||
|
func (b *BytePool) Reset() {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
for i := 0; i < len(b.used); i++ {
|
||||||
|
b.used[i] = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBytePool - Returns new pool.
|
||||||
|
// size - length of each slice.
|
||||||
|
// n - number of slices in the pool.
|
||||||
|
func NewBytePool(size int64, n int) *BytePool {
|
||||||
|
used := make([]bool, n)
|
||||||
|
buf := make([][]byte, n)
|
||||||
|
return &BytePool{buf, used, size, sync.Mutex{}}
|
||||||
|
}
|
53
pkg/bpool/bpool_test.go
Normal file
53
pkg/bpool/bpool_test.go
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
/*
|
||||||
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package bpool
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestBytePool(t *testing.T) {
|
||||||
|
size := int64(10)
|
||||||
|
n := 16
|
||||||
|
pool := NewBytePool(size, n)
|
||||||
|
enBlocks := make([][]byte, n)
|
||||||
|
|
||||||
|
// Allocates all the 16 byte slices in the pool.
|
||||||
|
alloc := func() {
|
||||||
|
for i := range enBlocks {
|
||||||
|
var err error
|
||||||
|
enBlocks[i], err = pool.Get()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("expected nil, got", err)
|
||||||
|
}
|
||||||
|
// Make sure the slice length is as expected.
|
||||||
|
if len(enBlocks[i]) != int(size) {
|
||||||
|
t.Fatalf("expected size %d, got %d", len(enBlocks[i]), size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate everything in the pool.
|
||||||
|
alloc()
|
||||||
|
// Any Get() will fail when the pool does not have any free buffer.
|
||||||
|
_, err := pool.Get()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected %s, got nil", err)
|
||||||
|
}
|
||||||
|
// Reset - so that all the buffers are marked as unused.
|
||||||
|
pool.Reset()
|
||||||
|
// Allocation of all the buffers in the pool should succeed now.
|
||||||
|
alloc()
|
||||||
|
}
|
@ -26,6 +26,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio/minio/pkg/bpool"
|
||||||
"github.com/minio/minio/pkg/mimedb"
|
"github.com/minio/minio/pkg/mimedb"
|
||||||
"github.com/minio/minio/pkg/objcache"
|
"github.com/minio/minio/pkg/objcache"
|
||||||
)
|
)
|
||||||
@ -153,6 +154,10 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
|||||||
}
|
}
|
||||||
|
|
||||||
totalBytesRead := int64(0)
|
totalBytesRead := int64(0)
|
||||||
|
|
||||||
|
chunkSize := getChunkSize(xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks)
|
||||||
|
pool := bpool.NewBytePool(chunkSize, len(onlineDisks))
|
||||||
|
|
||||||
// Read from all parts.
|
// Read from all parts.
|
||||||
for ; partIndex <= lastPartIndex; partIndex++ {
|
for ; partIndex <= lastPartIndex; partIndex++ {
|
||||||
if length == totalBytesRead {
|
if length == totalBytesRead {
|
||||||
@ -183,7 +188,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start reading the part name.
|
// Start reading the part name.
|
||||||
n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partOffset, readSize, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, checkSums)
|
n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partOffset, readSize, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, checkSums, pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return toObjectErr(err, bucket, object)
|
return toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user