mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
fix: re-use bytes.Buffer using sync.Pool (#11156)
This commit is contained in:
parent
bfb92a27b7
commit
5982965839
@ -113,6 +113,7 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache,
|
|||||||
// Use global context for this.
|
// Use global context for this.
|
||||||
err := objAPI.GetObject(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), 0, -1, w, "", ObjectOptions{})
|
err := objAPI.GetObject(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), 0, -1, w, "", ObjectOptions{})
|
||||||
logger.LogIf(ctx, w.CloseWithError(err))
|
logger.LogIf(ctx, w.CloseWithError(err))
|
||||||
|
wg.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
case ObjectNotFound:
|
case ObjectNotFound:
|
||||||
@ -125,7 +126,6 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache,
|
|||||||
}
|
}
|
||||||
return newBucketMetacache(bucket, false), err
|
return newBucketMetacache(bucket, false), err
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
if decErr != nil {
|
if decErr != nil {
|
||||||
if errors.Is(err, context.Canceled) {
|
if errors.Is(err, context.Canceled) {
|
||||||
return newBucketMetacache(bucket, false), err
|
return newBucketMetacache(bucket, false), err
|
||||||
|
@ -432,7 +432,11 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
|
|
||||||
// We got a stream to start at.
|
// We got a stream to start at.
|
||||||
loadedPart := 0
|
loadedPart := 0
|
||||||
var buf bytes.Buffer
|
buf := bufferPool.Get().(*bytes.Buffer)
|
||||||
|
defer func() {
|
||||||
|
buf.Reset()
|
||||||
|
bufferPool.Put(buf)
|
||||||
|
}()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -482,7 +486,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, &buf, fi, metaArr, onlineDisks)
|
err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, buf, fi, metaArr, onlineDisks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
|
switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
|
||||||
case ObjectNotFound:
|
case ObjectNotFound:
|
||||||
@ -498,7 +502,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||||||
return entries, err
|
return entries, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tmp, err := newMetacacheReader(&buf)
|
tmp, err := newMetacacheReader(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return entries, err
|
return entries, err
|
||||||
}
|
}
|
||||||
|
@ -745,6 +745,12 @@ type metacacheBlockWriter struct {
|
|||||||
blockEntries int
|
blockEntries int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var bufferPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return new(bytes.Buffer)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// newMetacacheBlockWriter provides a streaming block writer.
|
// newMetacacheBlockWriter provides a streaming block writer.
|
||||||
// Each block is the size of the capacity of the input channel.
|
// Each block is the size of the capacity of the input channel.
|
||||||
// The caller should close to indicate the stream has ended.
|
// The caller should close to indicate the stream has ended.
|
||||||
@ -755,12 +761,15 @@ func newMetacacheBlockWriter(in <-chan metaCacheEntry, nextBlock func(b *metacac
|
|||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
var current metacacheBlock
|
var current metacacheBlock
|
||||||
var n int
|
var n int
|
||||||
var buf bytes.Buffer
|
buf := bufferPool.Get().(*bytes.Buffer)
|
||||||
block := newMetacacheWriter(&buf, 1<<20)
|
defer func() {
|
||||||
|
buf.Reset()
|
||||||
|
bufferPool.Put(buf)
|
||||||
|
}()
|
||||||
|
block := newMetacacheWriter(buf, 1<<20)
|
||||||
defer block.Close()
|
defer block.Close()
|
||||||
finishBlock := func() {
|
finishBlock := func() {
|
||||||
err := block.Close()
|
if err := block.Close(); err != nil {
|
||||||
if err != nil {
|
|
||||||
w.streamErr = err
|
w.streamErr = err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -769,7 +778,7 @@ func newMetacacheBlockWriter(in <-chan metaCacheEntry, nextBlock func(b *metacac
|
|||||||
// Prepare for next
|
// Prepare for next
|
||||||
current.n++
|
current.n++
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
block.Reset(&buf)
|
block.Reset(buf)
|
||||||
current.First = ""
|
current.First = ""
|
||||||
}
|
}
|
||||||
for o := range in {
|
for o := range in {
|
||||||
|
Loading…
Reference in New Issue
Block a user