From 7d8413a589d7763f293c0d60bf2b9f170c869c0e Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 13 Aug 2021 20:39:27 +0200 Subject: [PATCH] Reuse more metadata buffers (#12955) Reuse metadata buffers when no longer referenced. Takes care of most of the happy paths. --- cmd/erasure-server-pool.go | 2 ++ cmd/metacache-entries.go | 30 ++++++++++++++++++++++++++++++ cmd/metacache-server-pool.go | 26 ++++++++++++++++++++++++-- cmd/metacache-stream.go | 24 +++++++++++++++++++----- 4 files changed, 75 insertions(+), 7 deletions(-) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 0f3b7326b..b72745763 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -984,6 +984,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre if err != nil && err != io.EOF { return loi, err } + defer merged.truncate(0) // Release when returning if versionMarker == "" { o := listPathOptions{Marker: marker} // If we are not looking for a specific version skip it. @@ -1040,6 +1041,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma } merged.forwardPast(opts.Marker) + defer merged.truncate(0) // Release when returning // Default is recursive, if delimiter is set then list non recursive. objects := merged.fileInfos(bucket, prefix, delimiter) diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index 1f99fece5..4e1639f47 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -37,6 +37,9 @@ type metaCacheEntry struct { // cached contains the metadata if decoded. cached *FileInfo + + // Indicates the entry can be reused and only one reference to metadata is expected. + reusable bool } // isDir returns if the entry is representing a prefix directory. @@ -345,6 +348,8 @@ type metaCacheEntriesSorted struct { o metaCacheEntries // list id is not serialized listID string + // Reuse buffers + reuse bool } // shallowClone will create a shallow clone of the array objects, @@ -490,6 +495,15 @@ func (m *metaCacheEntriesSorted) forwardTo(s string) { idx := sort.Search(len(m.o), func(i int) bool { return m.o[i].name >= s }) + if m.reuse { + for i, entry := range m.o[:idx] { + if len(entry.metadata) >= metaDataReadDefault && len(entry.metadata) < metaDataReadDefault*4 { + metaDataPool.Put(entry.metadata) + } + m.o[i].metadata = nil + } + } + m.o = m.o[idx:] } @@ -501,6 +515,14 @@ func (m *metaCacheEntriesSorted) forwardPast(s string) { idx := sort.Search(len(m.o), func(i int) bool { return m.o[i].name > s }) + if m.reuse { + for i, entry := range m.o[:idx] { + if len(entry.metadata) >= metaDataReadDefault && len(entry.metadata) < metaDataReadDefault*4 { + metaDataPool.Put(entry.metadata) + } + m.o[i].metadata = nil + } + } m.o = m.o[idx:] } @@ -715,6 +737,14 @@ func (m *metaCacheEntriesSorted) truncate(n int) { return } if len(m.o) > n { + if m.reuse { + for i, entry := range m.o[n:] { + if len(entry.metadata) >= metaDataReadDefault && len(entry.metadata) < metaDataReadDefault*4 { + metaDataPool.Put(entry.metadata) + } + m.o[n+i].metadata = nil + } + } m.o = m.o[:n] } } diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index dd1a63e0e..8818463b5 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -162,6 +162,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( if o.pool < len(z.serverPools) && o.set < len(z.serverPools[o.pool].sets) { o.debugln("Resuming", o) entries, err = z.serverPools[o.pool].sets[o.set].streamMetadataParts(ctx, *o) + entries.reuse = true // We read from stream and are not sharing results. if err == nil { return entries, nil } @@ -217,6 +218,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( if listErr != nil && !errors.Is(listErr, context.Canceled) { return entries, listErr } + entries.reuse = true truncated := entries.len() > o.Limit || err == nil entries.truncate(o.Limit) if !o.Transient && truncated { @@ -363,13 +365,33 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions o.debugln("listAndSave: listing", o.ID, "finished with ", err) }(*o) + // Keep track of when we return since we no longer have to send entries to output. + var funcReturned bool + var funcReturnedMu sync.Mutex + defer func() { + funcReturnedMu.Lock() + funcReturned = true + funcReturnedMu.Unlock() + }() // Write listing to results and saver. go func() { + var returned bool for entry := range inCh { - outCh <- entry + if !returned { + funcReturnedMu.Lock() + returned = funcReturned + funcReturnedMu.Unlock() + outCh <- entry + if returned { + close(outCh) + } + } + entry.reusable = returned saveCh <- entry } - close(outCh) + if !returned { + close(outCh) + } close(saveCh) }() diff --git a/cmd/metacache-stream.go b/cmd/metacache-stream.go index c11227354..8c0963432 100644 --- a/cmd/metacache-stream.go +++ b/cmd/metacache-stream.go @@ -142,8 +142,10 @@ func (w *metacacheWriter) write(objs ...metaCacheEntry) error { if err != nil { return err } - if w.reuseBlocks && cap(o.metadata) >= metaDataReadDefault { - metaDataPool.Put(o.metadata) + if w.reuseBlocks || o.reusable { + if cap(o.metadata) >= metaDataReadDefault && cap(o.metadata) < metaDataReadDefault*4 { + metaDataPool.Put(o.metadata) + } } } @@ -358,10 +360,14 @@ func (r *metacacheReader) next() (metaCacheEntry, error) { r.err = err return m, err } - m.metadata, err = r.mr.ReadBytes(nil) + m.metadata, err = r.mr.ReadBytes(metaDataPool.Get().([]byte)[:0]) if err == io.EOF { err = io.ErrUnexpectedEOF } + if len(m.metadata) == 0 && cap(m.metadata) >= metaDataReadDefault { + metaDataPool.Put(m.metadata) + m.metadata = nil + } r.err = err return m, err } @@ -514,13 +520,17 @@ func (r *metacacheReader) readN(n int, inclDeleted, inclDirs bool, prefix string r.mr.R.Skip(1) return metaCacheEntriesSorted{o: res}, io.EOF } - if meta.metadata, err = r.mr.ReadBytes(nil); err != nil { + if meta.metadata, err = r.mr.ReadBytes(metaDataPool.Get().([]byte)[:0]); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF } r.err = err return metaCacheEntriesSorted{o: res}, err } + if len(meta.metadata) == 0 && cap(meta.metadata) >= metaDataReadDefault { + metaDataPool.Put(meta.metadata) + meta.metadata = nil + } if !inclDirs && meta.isDir() { continue } @@ -569,13 +579,17 @@ func (r *metacacheReader) readAll(ctx context.Context, dst chan<- metaCacheEntry r.err = err return err } - if meta.metadata, err = r.mr.ReadBytes(nil); err != nil { + if meta.metadata, err = r.mr.ReadBytes(metaDataPool.Get().([]byte)[:0]); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF } r.err = err return err } + if len(meta.metadata) == 0 && cap(meta.metadata) >= metaDataReadDefault { + metaDataPool.Put(meta.metadata) + meta.metadata = nil + } select { case <-ctx.Done(): r.err = ctx.Err()