fix: use NumVersions for list resolver (#12599)

also do not incorrectly double count
objExists unless its selected and it
matches with previous entry.

Bonus: change listQuorum to match with
AskDisks to ensure that we atleast by
default choose all the "drives" that
we asked is consistent.
This commit is contained in:
Harshavardhana 2021-06-30 07:43:19 -07:00 committed by GitHub
parent 3137dc2eb3
commit a6ad965799
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 9 deletions

View File

@ -61,7 +61,6 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
} }
di, err := disks[i-1].DiskInfo(context.Background()) di, err := disks[i-1].DiskInfo(context.Background())
if err != nil || di.Healing { if err != nil || di.Healing {
// - Do not consume disks which are not reachable // - Do not consume disks which are not reachable
// unformatted or simply not accessible for some reason. // unformatted or simply not accessible for some reason.
// //

View File

@ -80,6 +80,12 @@ func (e *metaCacheEntry) matches(other *metaCacheEntry, bucket string) bool {
return eErr == oErr return eErr == oErr
} }
// check both fileInfo's have same number of versions, if not skip
// the `other` entry.
if eFi.NumVersions != oFi.NumVersions {
return false
}
return eFi.ModTime.Equal(oFi.ModTime) && eFi.Size == oFi.Size && eFi.VersionID == oFi.VersionID return eFi.ModTime.Equal(oFi.ModTime) && eFi.Size == oFi.Size && eFi.VersionID == oFi.VersionID
} }
@ -202,12 +208,12 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa
dirExists := 0 dirExists := 0
objExists := 0 objExists := 0
var selFIV *FileInfo
for i := range m { for i := range m {
entry := &m[i] entry := &m[i]
if entry.name == "" { if entry.name == "" {
continue continue
} }
if entry.isDir() { if entry.isDir() {
dirExists++ dirExists++
selected = entry selected = entry
@ -215,19 +221,18 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa
} }
// Get new entry metadata // Get new entry metadata
fiv, err := entry.fileInfo(r.bucket) if _, err := entry.fileInfo(r.bucket); err != nil {
if err != nil {
continue continue
} }
objExists++ if selected == nil {
if selFIV == nil { objExists++
selected = entry selected = entry
selFIV = fiv
continue continue
} }
if selected.matches(entry, r.bucket) { if selected.matches(entry, r.bucket) {
objExists++
continue continue
} }
} }

View File

@ -572,8 +572,11 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
}() }()
askDisks := o.AskDisks askDisks := o.AskDisks
listingQuorum := askDisks - 1 if askDisks == 0 {
// Special case: ask all disks if the drive count is 4 askDisks = globalAPIConfig.getListQuorum()
}
// make sure atleast default '3' lists object is present.
listingQuorum := askDisks
if askDisks == -1 || er.setDriveCount == 4 { if askDisks == -1 || er.setDriveCount == 4 {
askDisks = len(disks) // with 'strict' quorum list on all online disks. askDisks = len(disks) // with 'strict' quorum list on all online disks.
listingQuorum = getReadQuorum(er.setDriveCount) listingQuorum = getReadQuorum(er.setDriveCount)