From f713436dd0c3f810b3b552281ce94160cd6b7c4b Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 30 Jan 2023 18:13:53 +0100 Subject: [PATCH] Fix truncated list response on deleted replicated objects (#16504) --- cmd/metacache-entries.go | 37 +++++++++++++++++++++++++++++++++++++ cmd/metacache-set.go | 5 ++++- cmd/metacache-walk.go | 2 +- cmd/xl-storage-format-v2.go | 31 +++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index 5443c990f..6f2475c68 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -189,6 +189,43 @@ func (e *metaCacheEntry) isLatestDeletemarker() bool { return xlMeta.versions[0].header.Type == DeleteType } +// isAllFreeVersions returns if all objects are free versions. +// If metadata is NOT versioned false will always be returned. +// If v2 and UNABLE to load metadata true will be returned. +func (e *metaCacheEntry) isAllFreeVersions() bool { + if e.cached != nil { + if len(e.cached.versions) == 0 { + return true + } + for _, v := range e.cached.versions { + if !v.header.FreeVersion() { + return false + } + } + return true + } + if !isXL2V1Format(e.metadata) { + return false + } + if meta, _, err := isIndexedMetaV2(e.metadata); meta != nil { + return meta.AllHidden(false) + } else if err != nil { + return true + } + // Fall back... + xlMeta, err := e.xlmeta() + if err != nil || len(xlMeta.versions) == 0 { + return true + } + // Check versions.. + for _, v := range e.cached.versions { + if !v.header.FreeVersion() { + return false + } + } + return true +} + // fileInfo returns the decoded metadata. // If entry is a directory it is returned as that. // If versioned the latest version will be returned. diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 2a14cf48b..9d4e9f231 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -367,7 +367,10 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor return true } if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() { - return entries.len() < o.Limit + return true + } + if entry.isAllFreeVersions() { + return true } entries.o = append(entries.o, entry) return entries.len() < o.Limit diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 8b1c2a5ee..dadf67ad5 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -93,7 +93,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if opts.Limit <= 0 { return } - if m, _, _ := isIndexedMetaV2(metadata); m != nil && !m.IsLatestDeleteMarker() { + if m, _, _ := isIndexedMetaV2(metadata); m != nil && !m.AllHidden(true) { objsReturned++ } } diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 81df38b35..3d3ce0a3c 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -2129,3 +2129,34 @@ func (x xlMetaBuf) IsLatestDeleteMarker() bool { }) return isDeleteMarker } + +// AllHidden returns true are no versions that would show up in a listing (ie all free markers) +// Optionally also return early if top is a delete marker. +func (x xlMetaBuf) AllHidden(topDeleteMarker bool) bool { + vers, headerV, _, buf, err := decodeXLHeaders(x) + if err != nil { + return false + } + if vers == 0 { + return true + } + hidden := true + + var xl xlMetaV2VersionHeader + _ = decodeVersions(buf, vers, func(idx int, hdr, _ []byte) error { + if _, err := xl.unmarshalV(headerV, hdr); err != nil { + return errDoneForNow + } + if topDeleteMarker && idx == 0 && xl.Type == DeleteType { + hidden = true + return errDoneForNow + } + if !xl.FreeVersion() { + hidden = false + return errDoneForNow + } + // Check next version + return nil + }) + return hidden +}