mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
optimize memory allocs during reconstruct (#4964)
The reedsolomon library now avoids allocations during reconstruction. This change exploits that to reduce memory allocs and GC preasure during healing and reading.
This commit is contained in:
committed by
Dee Koder
parent
4879cd73f8
commit
02af37a394
@@ -70,6 +70,10 @@ func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string,
|
||||
// Scan part files on disk, block-by-block reconstruct it and
|
||||
// write to stale disks.
|
||||
chunksize := getChunkSize(blocksize, s.dataBlocks)
|
||||
blocks := make([][]byte, len(s.disks))
|
||||
for i := range blocks {
|
||||
blocks[i] = make([]byte, chunksize)
|
||||
}
|
||||
var chunkOffset, blockOffset int64
|
||||
for ; blockOffset < size; blockOffset += blocksize {
|
||||
// last iteration may have less than blocksize data
|
||||
@@ -77,40 +81,35 @@ func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string,
|
||||
if size < blockOffset+blocksize {
|
||||
blocksize = size - blockOffset
|
||||
chunksize = getChunkSize(blocksize, s.dataBlocks)
|
||||
for i := range blocks {
|
||||
blocks[i] = blocks[i][:chunksize]
|
||||
}
|
||||
}
|
||||
|
||||
// read a chunk from each disk, until we have
|
||||
// `s.dataBlocks` number of chunks set to non-nil in
|
||||
// `blocks`
|
||||
blocks := make([][]byte, len(s.disks))
|
||||
var buffer []byte
|
||||
numReads := 0
|
||||
for i, disk := range s.disks {
|
||||
// skip reading from unavailable or stale disks
|
||||
if disk == nil || staleDisks[i] != nil {
|
||||
blocks[i] = blocks[i][:0] // mark shard as missing
|
||||
continue
|
||||
}
|
||||
// allocate buffer only when needed - when
|
||||
// reads fail, the buffer can be reused
|
||||
if int64(len(buffer)) != chunksize {
|
||||
buffer = make([]byte, chunksize)
|
||||
}
|
||||
_, err = disk.ReadFile(volume, path, chunkOffset, buffer, verifiers[i])
|
||||
_, err = disk.ReadFile(volume, path, chunkOffset, blocks[i], verifiers[i])
|
||||
if err != nil {
|
||||
// LOG FIXME: add a conditional log
|
||||
// for read failures, once per-disk
|
||||
// per-function-invocation.
|
||||
blocks[i] = blocks[i][:0] // mark shard as missing
|
||||
continue
|
||||
}
|
||||
|
||||
// read was successful, so set the buffer as
|
||||
// blocks[i], and reset buffer to nil to force
|
||||
// allocation on next iteration
|
||||
blocks[i], buffer = buffer, nil
|
||||
|
||||
numReads++
|
||||
if numReads == s.dataBlocks {
|
||||
// we have enough data to reconstruct
|
||||
// mark all other blocks as missing
|
||||
for j := i + 1; j < len(blocks); j++ {
|
||||
blocks[j] = blocks[j][:0] // mark shard as missing
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,14 +18,12 @@ package cmd
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
)
|
||||
|
||||
// ReadFile reads as much data as requested from the file under the given volume and path and writes the data to the provided writer.
|
||||
// The algorithm and the keys/checksums are used to verify the integrity of the given file. ReadFile will read data from the given offset
|
||||
// up to the given length. If parts of the file are corrupted ReadFile tries to reconstruct the data.
|
||||
func (s ErasureStorage) ReadFile(writer io.Writer, volume, path string, offset, length int64, totalLength int64, checksums [][]byte, algorithm BitrotAlgorithm, blocksize int64, pool *bpool.BytePool) (f ErasureFileInfo, err error) {
|
||||
func (s ErasureStorage) ReadFile(writer io.Writer, volume, path string, offset, length int64, totalLength int64, checksums [][]byte, algorithm BitrotAlgorithm, blocksize int64) (f ErasureFileInfo, err error) {
|
||||
if offset < 0 || length < 0 {
|
||||
return f, traceError(errUnexpected)
|
||||
}
|
||||
@@ -53,15 +51,20 @@ func (s ErasureStorage) ReadFile(writer io.Writer, volume, path string, offset,
|
||||
chunksize := getChunkSize(blocksize, s.dataBlocks)
|
||||
|
||||
blocks := make([][]byte, len(s.disks))
|
||||
for i := range blocks {
|
||||
blocks[i] = make([]byte, chunksize)
|
||||
}
|
||||
for off := offset / blocksize; length > 0; off++ {
|
||||
blockOffset := off * chunksize
|
||||
pool.Reset()
|
||||
|
||||
if currentBlock := (offset + f.Size) / blocksize; currentBlock == lastBlock {
|
||||
blocksize = totalLength % blocksize
|
||||
chunksize = getChunkSize(blocksize, s.dataBlocks)
|
||||
for i := range blocks {
|
||||
blocks[i] = blocks[i][:chunksize]
|
||||
}
|
||||
}
|
||||
err = s.readConcurrent(volume, path, blockOffset, chunksize, blocks, verifiers, errChans, pool)
|
||||
err = s.readConcurrent(volume, path, blockOffset, blocks, verifiers, errChans)
|
||||
if err != nil {
|
||||
return f, traceError(errXLReadQuorum)
|
||||
}
|
||||
@@ -92,7 +95,7 @@ func (s ErasureStorage) ReadFile(writer io.Writer, volume, path string, offset,
|
||||
func erasureCountMissingBlocks(blocks [][]byte, limit int) int {
|
||||
missing := 0
|
||||
for i := range blocks[:limit] {
|
||||
if blocks[i] == nil {
|
||||
if len(blocks[i]) == 0 {
|
||||
missing++
|
||||
}
|
||||
}
|
||||
@@ -101,15 +104,8 @@ func erasureCountMissingBlocks(blocks [][]byte, limit int) int {
|
||||
|
||||
// readConcurrent reads all requested data concurrently from the disks into blocks. It returns an error if
|
||||
// too many disks failed while reading.
|
||||
func (s *ErasureStorage) readConcurrent(volume, path string, offset int64, length int64, blocks [][]byte, verifiers []*BitrotVerifier, errChans []chan error, pool *bpool.BytePool) (err error) {
|
||||
func (s *ErasureStorage) readConcurrent(volume, path string, offset int64, blocks [][]byte, verifiers []*BitrotVerifier, errChans []chan error) (err error) {
|
||||
errs := make([]error, len(s.disks))
|
||||
for i := range blocks {
|
||||
blocks[i], err = pool.Get()
|
||||
if err != nil {
|
||||
return traceErrorf("failed to get new buffer from pool: %v", err)
|
||||
}
|
||||
blocks[i] = blocks[i][:length]
|
||||
}
|
||||
|
||||
erasureReadBlocksConcurrent(s.disks[:s.dataBlocks], volume, path, offset, blocks[:s.dataBlocks], verifiers[:s.dataBlocks], errs[:s.dataBlocks], errChans[:s.dataBlocks])
|
||||
missingDataBlocks := erasureCountMissingBlocks(blocks, s.dataBlocks)
|
||||
@@ -145,7 +141,7 @@ func erasureReadBlocksConcurrent(disks []StorageAPI, volume, path string, offset
|
||||
errors[i] = <-errChans[i] // blocks until the go routine 'i' is done - no data race
|
||||
if errors[i] != nil {
|
||||
disks[i] = OfflineDisk
|
||||
blocks[i] = nil
|
||||
blocks[i] = blocks[i][:0] // mark shard as missing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
"testing"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
)
|
||||
|
||||
func (d badDisk) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) {
|
||||
@@ -108,9 +107,8 @@ func TestErasureReadFile(t *testing.T) {
|
||||
setup.Remove()
|
||||
t.Fatalf("Test %d: failed to create erasure test file: %v", i, err)
|
||||
}
|
||||
pool := bpool.NewBytePool(getChunkSize(test.blocksize, test.dataBlocks), len(storage.disks))
|
||||
writer := bytes.NewBuffer(nil)
|
||||
readInfo, err := storage.ReadFile(writer, "testbucket", "object", test.offset, test.length, test.data, file.Checksums, test.algorithm, test.blocksize, pool)
|
||||
readInfo, err := storage.ReadFile(writer, "testbucket", "object", test.offset, test.length, test.data, file.Checksums, test.algorithm, test.blocksize)
|
||||
if err != nil && !test.shouldFail {
|
||||
t.Errorf("Test %d: should pass but failed with: %v", i, err)
|
||||
}
|
||||
@@ -136,7 +134,7 @@ func TestErasureReadFile(t *testing.T) {
|
||||
if test.offDisks > 0 {
|
||||
storage.disks[0] = OfflineDisk
|
||||
}
|
||||
readInfo, err = storage.ReadFile(writer, "testbucket", "object", test.offset, test.length, test.data, file.Checksums, test.algorithm, test.blocksize, pool)
|
||||
readInfo, err = storage.ReadFile(writer, "testbucket", "object", test.offset, test.length, test.data, file.Checksums, test.algorithm, test.blocksize)
|
||||
if err != nil && !test.shouldFailQuorum {
|
||||
t.Errorf("Test %d: should pass but failed with: %v", i, err)
|
||||
}
|
||||
@@ -204,11 +202,6 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) {
|
||||
// To generate random offset/length.
|
||||
r := rand.New(rand.NewSource(UTCNow().UnixNano()))
|
||||
|
||||
// create pool buffer which will be used by erasureReadFile for
|
||||
// reading from disks and erasure decoding.
|
||||
chunkSize := getChunkSize(blockSize, dataBlocks)
|
||||
pool := bpool.NewBytePool(chunkSize, len(storage.disks))
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
|
||||
// Verify erasureReadFile() for random offsets and lengths.
|
||||
@@ -218,7 +211,7 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) {
|
||||
|
||||
expected := data[offset : offset+readLen]
|
||||
|
||||
_, err = storage.ReadFile(buf, "testbucket", "testobject", offset, readLen, length, file.Checksums, DefaultBitrotAlgorithm, blockSize, pool)
|
||||
_, err = storage.ReadFile(buf, "testbucket", "testobject", offset, readLen, length, file.Checksums, DefaultBitrotAlgorithm, blockSize)
|
||||
if err != nil {
|
||||
t.Fatal(err, offset, readLen)
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
"github.com/minio/minio/pkg/mimedb"
|
||||
"github.com/minio/minio/pkg/objcache"
|
||||
)
|
||||
@@ -242,10 +241,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
||||
}
|
||||
|
||||
var totalBytesRead int64
|
||||
|
||||
chunkSize := getChunkSize(xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks)
|
||||
pool := bpool.NewBytePool(chunkSize, len(onlineDisks))
|
||||
|
||||
storage, err := NewErasureStorage(onlineDisks, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
@@ -276,7 +271,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
||||
checksums[index] = checksumInfo.Hash
|
||||
}
|
||||
|
||||
file, err := storage.ReadFile(mw, bucket, pathJoin(object, partName), partOffset, readSize, partSize, checksums, algorithm, xlMeta.Erasure.BlockSize, pool)
|
||||
file, err := storage.ReadFile(mw, bucket, pathJoin(object, partName), partOffset, readSize, partSize, checksums, algorithm, xlMeta.Erasure.BlockSize)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to read %s of the object `%s/%s`.", partName, bucket, object)
|
||||
return toObjectErr(err, bucket, object)
|
||||
|
||||
Reference in New Issue
Block a user