mirror of
https://github.com/minio/minio.git
synced 2025-11-10 14:09:48 -05:00
reduce memory usage in metacache reader (#12334)
This commit is contained in:
@@ -84,7 +84,9 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
|
||||
if write < int64(len(block)) {
|
||||
n, err := io.Copy(dst, bytes.NewReader(block[:write]))
|
||||
if err != nil {
|
||||
if err != io.ErrClosedPipe {
|
||||
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
||||
// The reader pipe might be closed at ListObjects io.EOF ignore it.
|
||||
if err != io.ErrClosedPipe && err != io.EOF {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
return 0, err
|
||||
@@ -97,7 +99,8 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
|
||||
n, err := io.Copy(dst, bytes.NewReader(block))
|
||||
if err != nil {
|
||||
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
||||
if err != io.ErrClosedPipe {
|
||||
// The reader pipe might be closed at ListObjects io.EOF ignore it.
|
||||
if err != io.ErrClosedPipe && err != io.EOF {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
return 0, err
|
||||
|
||||
@@ -418,11 +418,6 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
||||
|
||||
// We got a stream to start at.
|
||||
loadedPart := 0
|
||||
buf := bufferPool.Get().(*bytes.Buffer)
|
||||
defer func() {
|
||||
buf.Reset()
|
||||
bufferPool.Put(buf)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -471,9 +466,27 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
||||
}
|
||||
}
|
||||
}
|
||||
buf.Reset()
|
||||
err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, buf, fi, metaArr, onlineDisks)
|
||||
if err != nil {
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
werr := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0,
|
||||
fi.Size, pw, fi, metaArr, onlineDisks)
|
||||
pw.CloseWithError(werr)
|
||||
}()
|
||||
|
||||
tmp := newMetacacheReader(pr)
|
||||
e, err := tmp.filter(o)
|
||||
pr.CloseWithError(err)
|
||||
entries.o = append(entries.o, e.o...)
|
||||
if o.Limit > 0 && entries.len() > o.Limit {
|
||||
entries.truncate(o.Limit)
|
||||
return entries, nil
|
||||
}
|
||||
if err == nil {
|
||||
// We stopped within the listing, we are done for now...
|
||||
return entries, nil
|
||||
}
|
||||
if err != nil && err.Error() != io.EOF.Error() {
|
||||
switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
|
||||
case ObjectNotFound:
|
||||
retries++
|
||||
@@ -488,24 +501,6 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
||||
return entries, err
|
||||
}
|
||||
}
|
||||
tmp, err := newMetacacheReader(buf)
|
||||
if err != nil {
|
||||
return entries, err
|
||||
}
|
||||
e, err := tmp.filter(o)
|
||||
entries.o = append(entries.o, e.o...)
|
||||
if o.Limit > 0 && entries.len() > o.Limit {
|
||||
entries.truncate(o.Limit)
|
||||
return entries, nil
|
||||
}
|
||||
if err == nil {
|
||||
// We stopped within the listing, we are done for now...
|
||||
return entries, nil
|
||||
}
|
||||
if !errors.Is(err, io.EOF) {
|
||||
logger.LogIf(ctx, err)
|
||||
return entries, err
|
||||
}
|
||||
|
||||
// We finished at the end of the block.
|
||||
// And should not expect any more results.
|
||||
@@ -824,10 +819,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
|
||||
// Make sure we close the pipe so blocked writes doesn't stay around.
|
||||
defer r.CloseWithError(context.Canceled)
|
||||
|
||||
readers[i], err = newMetacacheReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readers[i] = newMetacacheReader(r)
|
||||
d := disks[i]
|
||||
|
||||
// Send request to each disk.
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -30,6 +29,7 @@ import (
|
||||
"github.com/klauspost/compress/s2"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
"github.com/valyala/bytebufferpool"
|
||||
)
|
||||
|
||||
// metadata stream format:
|
||||
@@ -246,11 +246,11 @@ type metacacheReader struct {
|
||||
|
||||
// newMetacacheReader creates a new cache reader.
|
||||
// Nothing will be read from the stream yet.
|
||||
func newMetacacheReader(r io.Reader) (*metacacheReader, error) {
|
||||
func newMetacacheReader(r io.Reader) *metacacheReader {
|
||||
dec := s2DecPool.Get().(*s2.Reader)
|
||||
dec.Reset(r)
|
||||
mr := msgp.NewReader(dec)
|
||||
m := metacacheReader{
|
||||
return &metacacheReader{
|
||||
mr: mr,
|
||||
closer: func() {
|
||||
dec.Reset(nil)
|
||||
@@ -269,7 +269,6 @@ func newMetacacheReader(r io.Reader) (*metacacheReader, error) {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
func (r *metacacheReader) checkInit() {
|
||||
@@ -747,12 +746,6 @@ type metacacheBlockWriter struct {
|
||||
blockEntries int
|
||||
}
|
||||
|
||||
var bufferPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return new(bytes.Buffer)
|
||||
},
|
||||
}
|
||||
|
||||
// newMetacacheBlockWriter provides a streaming block writer.
|
||||
// Each block is the size of the capacity of the input channel.
|
||||
// The caller should close to indicate the stream has ended.
|
||||
@@ -763,11 +756,13 @@ func newMetacacheBlockWriter(in <-chan metaCacheEntry, nextBlock func(b *metacac
|
||||
defer w.wg.Done()
|
||||
var current metacacheBlock
|
||||
var n int
|
||||
buf := bufferPool.Get().(*bytes.Buffer)
|
||||
|
||||
buf := bytebufferpool.Get()
|
||||
defer func() {
|
||||
buf.Reset()
|
||||
bufferPool.Put(buf)
|
||||
bytebufferpool.Put(buf)
|
||||
}()
|
||||
|
||||
block := newMetacacheWriter(buf, 1<<20)
|
||||
defer block.Close()
|
||||
finishBlock := func() {
|
||||
|
||||
@@ -34,11 +34,7 @@ func loadMetacacheSample(t testing.TB) *metacacheReader {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r, err := newMetacacheReader(bytes.NewReader(b))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return r
|
||||
return newMetacacheReader(bytes.NewReader(b))
|
||||
}
|
||||
|
||||
func loadMetacacheSampleEntries(t testing.TB) metaCacheEntriesSorted {
|
||||
@@ -388,10 +384,7 @@ func Test_newMetacacheStream(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err = newMetacacheReader(&buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r = newMetacacheReader(&buf)
|
||||
defer r.Close()
|
||||
names, err := r.readNames(-1)
|
||||
if err != io.EOF {
|
||||
|
||||
@@ -29,30 +29,33 @@ import (
|
||||
// handle all cases where we have known types of errors returned by
|
||||
// underlying storage layer.
|
||||
func toObjectErr(err error, params ...string) error {
|
||||
switch err {
|
||||
case errVolumeNotFound:
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
switch err.Error() {
|
||||
case errVolumeNotFound.Error():
|
||||
apiErr := BucketNotFound{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
}
|
||||
return apiErr
|
||||
case errVolumeNotEmpty:
|
||||
case errVolumeNotEmpty.Error():
|
||||
apiErr := BucketNotEmpty{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
}
|
||||
return apiErr
|
||||
case errVolumeExists:
|
||||
case errVolumeExists.Error():
|
||||
apiErr := BucketExists{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
}
|
||||
return apiErr
|
||||
case errDiskFull:
|
||||
case errDiskFull.Error():
|
||||
return StorageFull{}
|
||||
case errTooManyOpenFiles:
|
||||
case errTooManyOpenFiles.Error():
|
||||
return SlowDown{}
|
||||
case errFileAccessDenied:
|
||||
case errFileAccessDenied.Error():
|
||||
apiErr := PrefixAccessDenied{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -61,7 +64,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case errFileParentIsFile:
|
||||
case errFileParentIsFile.Error():
|
||||
apiErr := ParentIsObject{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -70,7 +73,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case errIsNotRegular:
|
||||
case errIsNotRegular.Error():
|
||||
apiErr := ObjectExistsAsDirectory{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -79,7 +82,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case errFileVersionNotFound:
|
||||
case errFileVersionNotFound.Error():
|
||||
apiErr := VersionNotFound{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -91,7 +94,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.VersionID = params[2]
|
||||
}
|
||||
return apiErr
|
||||
case errMethodNotAllowed:
|
||||
case errMethodNotAllowed.Error():
|
||||
apiErr := MethodNotAllowed{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -100,7 +103,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case errFileNotFound:
|
||||
case errFileNotFound.Error():
|
||||
apiErr := ObjectNotFound{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -109,7 +112,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case errUploadIDNotFound:
|
||||
case errUploadIDNotFound.Error():
|
||||
apiErr := InvalidUploadID{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -121,7 +124,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.UploadID = params[2]
|
||||
}
|
||||
return apiErr
|
||||
case errFileNameTooLong:
|
||||
case errFileNameTooLong.Error():
|
||||
apiErr := ObjectNameInvalid{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -130,7 +133,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case errDataTooLarge:
|
||||
case errDataTooLarge.Error():
|
||||
apiErr := ObjectTooLarge{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -139,7 +142,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case errDataTooSmall:
|
||||
case errDataTooSmall.Error():
|
||||
apiErr := ObjectTooSmall{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -148,7 +151,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case errErasureReadQuorum:
|
||||
case errErasureReadQuorum.Error():
|
||||
apiErr := InsufficientReadQuorum{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -157,7 +160,7 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case errErasureWriteQuorum:
|
||||
case errErasureWriteQuorum.Error():
|
||||
apiErr := InsufficientWriteQuorum{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
@@ -166,9 +169,9 @@ func toObjectErr(err error, params ...string) error {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case io.ErrUnexpectedEOF, io.ErrShortWrite:
|
||||
case io.ErrUnexpectedEOF.Error(), io.ErrShortWrite.Error():
|
||||
return IncompleteBody{}
|
||||
case context.Canceled, context.DeadlineExceeded:
|
||||
case context.Canceled.Error(), context.DeadlineExceeded.Error():
|
||||
return IncompleteBody{}
|
||||
}
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user