fix: multi-way list entry resolution (#12617)

Fixes brought forward from https://github.com/minio/minio/pull/12605

Fixes resolution when an object is in prefix of another and one zone returns the directory and another the object.

Fixes resolution on single entries that arrive first, so resolution doesn't depend on order.
This commit is contained in:
Klaus Post 2021-07-02 09:54:00 -07:00 committed by GitHub
parent 0ad03908d0
commit f706671568
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 98 additions and 20 deletions

View File

@ -89,6 +89,38 @@ func (e *metaCacheEntry) matches(other *metaCacheEntry, bucket string) bool {
return eFi.ModTime.Equal(oFi.ModTime) && eFi.Size == oFi.Size && eFi.VersionID == oFi.VersionID
}
// resolveEntries returns if the entries match by comparing their latest version fileinfo.
func resolveEntries(a, b *metaCacheEntry, bucket string) *metaCacheEntry {
if b == nil {
return a
}
if a == nil {
return b
}
aFi, err := a.fileInfo(bucket)
if err != nil {
return b
}
bFi, err := b.fileInfo(bucket)
if err != nil {
return a
}
if !aFi.ModTime.Equal(bFi.ModTime) {
if aFi.ModTime.After(bFi.ModTime) {
return a
}
return b
}
if aFi.NumVersions > bFi.NumVersions {
return a
}
return b
}
// isInDir returns whether the entry is in the dir when considering the separator.
func (e metaCacheEntry) isInDir(dir, separator string) bool {
if len(dir) == 0 {
@ -199,15 +231,29 @@ type metadataResolutionParams struct {
dirQuorum int // Number if disks needed for a directory to 'exist'.
objQuorum int // Number of disks needed for an object to 'exist'.
bucket string // Name of the bucket. Used for generating cached fileinfo.
// Reusable slice for resolution
candidates []struct {
n int
e *metaCacheEntry
}
}
// resolve multiple entries.
// entries are resolved by majority, then if tied by mod-time and versions.
func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCacheEntry, ok bool) {
if len(m) == 0 {
return nil, false
}
dirExists := 0
objExists := 0
if cap(r.candidates) < len(m) {
r.candidates = make([]struct {
n int
e *metaCacheEntry
}, 0, len(m))
}
r.candidates = r.candidates[0:]
for i := range m {
entry := &m[i]
if entry.name == "" {
@ -225,28 +271,56 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa
continue
}
found := false
for i, c := range r.candidates {
if c.e.matches(entry, r.bucket) {
c.n++
r.candidates[i] = c
found = true
break
}
}
if !found {
r.candidates = append(r.candidates, struct {
n int
e *metaCacheEntry
}{n: 1, e: entry})
}
}
if selected != nil && selected.isDir() && dirExists > r.dirQuorum {
return selected, true
}
switch len(r.candidates) {
case 0:
if selected == nil {
objExists++
selected = entry
continue
return nil, false
}
if selected.matches(entry, r.bucket) {
objExists++
continue
if !selected.isDir() || dirExists < r.dirQuorum {
return nil, false
}
return selected, true
case 1:
cand := r.candidates[0]
if cand.n < r.objQuorum {
return nil, false
}
return cand.e, true
default:
// Sort by matches....
sort.Slice(r.candidates, func(i, j int) bool {
return r.candidates[i].n > r.candidates[j].n
})
// Check if we have enough.
if r.candidates[0].n < r.objQuorum {
return nil, false
}
if r.candidates[0].n > r.candidates[1].n {
return r.candidates[0].e, true
}
// Tie between two, resolve using modtime+versions.
return resolveEntries(r.candidates[0].e, r.candidates[1].e, r.bucket), true
}
if selected == nil {
return nil, false
}
if selected.isDir() && dirExists < r.dirQuorum {
return nil, false
} else if !selected.isDir() && objExists < r.objQuorum {
return nil, false
}
return selected, true
}
// firstFound returns the first found and the number of set entries.

View File

@ -182,7 +182,11 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
// Resolve non-trivial conflicts
entries.deduplicate(func(existing, other *metaCacheEntry) (replace bool) {
if existing.isDir() {
// Pick object over directory
if existing.isDir() && !other.isDir() {
return true
}
if !existing.isDir() && other.isDir() {
return false
}
eFIV, err := existing.fileInfo(o.Bucket)