Reuse more metadata buffers (#12955)

Reuse metadata buffers when no longer referenced.

Takes care of most of the happy paths.
This commit is contained in:
Klaus Post 2021-08-13 20:39:27 +02:00 committed by GitHub
parent 24722ddd02
commit 7d8413a589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 7 deletions

View File

@ -984,6 +984,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
if err != nil && err != io.EOF {
return loi, err
}
defer merged.truncate(0) // Release when returning
if versionMarker == "" {
o := listPathOptions{Marker: marker}
// If we are not looking for a specific version skip it.
@ -1040,6 +1041,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
}
merged.forwardPast(opts.Marker)
defer merged.truncate(0) // Release when returning
// Default is recursive, if delimiter is set then list non recursive.
objects := merged.fileInfos(bucket, prefix, delimiter)

View File

@ -37,6 +37,9 @@ type metaCacheEntry struct {
// cached contains the metadata if decoded.
cached *FileInfo
// Indicates the entry can be reused and only one reference to metadata is expected.
reusable bool
}
// isDir returns if the entry is representing a prefix directory.
@ -345,6 +348,8 @@ type metaCacheEntriesSorted struct {
o metaCacheEntries
// list id is not serialized
listID string
// Reuse buffers
reuse bool
}
// shallowClone will create a shallow clone of the array objects,
@ -490,6 +495,15 @@ func (m *metaCacheEntriesSorted) forwardTo(s string) {
idx := sort.Search(len(m.o), func(i int) bool {
return m.o[i].name >= s
})
if m.reuse {
for i, entry := range m.o[:idx] {
if len(entry.metadata) >= metaDataReadDefault && len(entry.metadata) < metaDataReadDefault*4 {
metaDataPool.Put(entry.metadata)
}
m.o[i].metadata = nil
}
}
m.o = m.o[idx:]
}
@ -501,6 +515,14 @@ func (m *metaCacheEntriesSorted) forwardPast(s string) {
idx := sort.Search(len(m.o), func(i int) bool {
return m.o[i].name > s
})
if m.reuse {
for i, entry := range m.o[:idx] {
if len(entry.metadata) >= metaDataReadDefault && len(entry.metadata) < metaDataReadDefault*4 {
metaDataPool.Put(entry.metadata)
}
m.o[i].metadata = nil
}
}
m.o = m.o[idx:]
}
@ -715,6 +737,14 @@ func (m *metaCacheEntriesSorted) truncate(n int) {
return
}
if len(m.o) > n {
if m.reuse {
for i, entry := range m.o[n:] {
if len(entry.metadata) >= metaDataReadDefault && len(entry.metadata) < metaDataReadDefault*4 {
metaDataPool.Put(entry.metadata)
}
m.o[n+i].metadata = nil
}
}
m.o = m.o[:n]
}
}

View File

@ -162,6 +162,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
if o.pool < len(z.serverPools) && o.set < len(z.serverPools[o.pool].sets) {
o.debugln("Resuming", o)
entries, err = z.serverPools[o.pool].sets[o.set].streamMetadataParts(ctx, *o)
entries.reuse = true // We read from stream and are not sharing results.
if err == nil {
return entries, nil
}
@ -217,6 +218,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
if listErr != nil && !errors.Is(listErr, context.Canceled) {
return entries, listErr
}
entries.reuse = true
truncated := entries.len() > o.Limit || err == nil
entries.truncate(o.Limit)
if !o.Transient && truncated {
@ -363,13 +365,33 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions
o.debugln("listAndSave: listing", o.ID, "finished with ", err)
}(*o)
// Keep track of when we return since we no longer have to send entries to output.
var funcReturned bool
var funcReturnedMu sync.Mutex
defer func() {
funcReturnedMu.Lock()
funcReturned = true
funcReturnedMu.Unlock()
}()
// Write listing to results and saver.
go func() {
var returned bool
for entry := range inCh {
outCh <- entry
if !returned {
funcReturnedMu.Lock()
returned = funcReturned
funcReturnedMu.Unlock()
outCh <- entry
if returned {
close(outCh)
}
}
entry.reusable = returned
saveCh <- entry
}
close(outCh)
if !returned {
close(outCh)
}
close(saveCh)
}()

View File

@ -142,8 +142,10 @@ func (w *metacacheWriter) write(objs ...metaCacheEntry) error {
if err != nil {
return err
}
if w.reuseBlocks && cap(o.metadata) >= metaDataReadDefault {
metaDataPool.Put(o.metadata)
if w.reuseBlocks || o.reusable {
if cap(o.metadata) >= metaDataReadDefault && cap(o.metadata) < metaDataReadDefault*4 {
metaDataPool.Put(o.metadata)
}
}
}
@ -358,10 +360,14 @@ func (r *metacacheReader) next() (metaCacheEntry, error) {
r.err = err
return m, err
}
m.metadata, err = r.mr.ReadBytes(nil)
m.metadata, err = r.mr.ReadBytes(metaDataPool.Get().([]byte)[:0])
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
if len(m.metadata) == 0 && cap(m.metadata) >= metaDataReadDefault {
metaDataPool.Put(m.metadata)
m.metadata = nil
}
r.err = err
return m, err
}
@ -514,13 +520,17 @@ func (r *metacacheReader) readN(n int, inclDeleted, inclDirs bool, prefix string
r.mr.R.Skip(1)
return metaCacheEntriesSorted{o: res}, io.EOF
}
if meta.metadata, err = r.mr.ReadBytes(nil); err != nil {
if meta.metadata, err = r.mr.ReadBytes(metaDataPool.Get().([]byte)[:0]); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
r.err = err
return metaCacheEntriesSorted{o: res}, err
}
if len(meta.metadata) == 0 && cap(meta.metadata) >= metaDataReadDefault {
metaDataPool.Put(meta.metadata)
meta.metadata = nil
}
if !inclDirs && meta.isDir() {
continue
}
@ -569,13 +579,17 @@ func (r *metacacheReader) readAll(ctx context.Context, dst chan<- metaCacheEntry
r.err = err
return err
}
if meta.metadata, err = r.mr.ReadBytes(nil); err != nil {
if meta.metadata, err = r.mr.ReadBytes(metaDataPool.Get().([]byte)[:0]); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
r.err = err
return err
}
if len(meta.metadata) == 0 && cap(meta.metadata) >= metaDataReadDefault {
metaDataPool.Put(meta.metadata)
meta.metadata = nil
}
select {
case <-ctx.Done():
r.err = ctx.Err()