mirror of
https://github.com/minio/minio.git
synced 2025-01-22 20:23:14 -05:00
merge versions across sets when listing (#16003)
This commit is contained in:
parent
2894dd4d1a
commit
808ecfe0f2
@ -621,7 +621,7 @@ func (m *metaCacheEntriesSorted) forwardPast(s string) {
|
||||
// The entry not chosen will be discarded.
|
||||
// If the context is canceled the function will return the error,
|
||||
// otherwise the function will return nil.
|
||||
func mergeEntryChannels(ctx context.Context, in []chan metaCacheEntry, out chan<- metaCacheEntry, compareMeta func(existing, other *metaCacheEntry) (replace bool)) error {
|
||||
func mergeEntryChannels(ctx context.Context, in []chan metaCacheEntry, out chan<- metaCacheEntry, readQuorum int) error {
|
||||
defer close(out)
|
||||
top := make([]*metaCacheEntry, len(in))
|
||||
nDone := 0
|
||||
@ -667,6 +667,7 @@ func mergeEntryChannels(ctx context.Context, in []chan metaCacheEntry, out chan<
|
||||
}
|
||||
}
|
||||
last := ""
|
||||
var toMerge []int
|
||||
|
||||
// Choose the best to return.
|
||||
for {
|
||||
@ -675,6 +676,7 @@ func mergeEntryChannels(ctx context.Context, in []chan metaCacheEntry, out chan<
|
||||
}
|
||||
best := top[0]
|
||||
bestIdx := 0
|
||||
toMerge = toMerge[:0]
|
||||
for i, other := range top[1:] {
|
||||
otherIdx := i + 1
|
||||
if other == nil {
|
||||
@ -693,24 +695,64 @@ func mergeEntryChannels(ctx context.Context, in []chan metaCacheEntry, out chan<
|
||||
// we compare the `foo-1/` after path.Clean() to
|
||||
// de-dup the entries.
|
||||
if path.Clean(best.name) == path.Clean(other.name) {
|
||||
if compareMeta(best, other) {
|
||||
// Replace "best"
|
||||
if err := selectFrom(bestIdx); err != nil {
|
||||
return err
|
||||
}
|
||||
best = other
|
||||
bestIdx = otherIdx
|
||||
} else if err := selectFrom(otherIdx); err != nil {
|
||||
// Keep best, replace "other"
|
||||
return err
|
||||
}
|
||||
toMerge = append(toMerge, otherIdx)
|
||||
continue
|
||||
}
|
||||
if best.name > other.name {
|
||||
toMerge = toMerge[:0]
|
||||
best = other
|
||||
bestIdx = otherIdx
|
||||
}
|
||||
}
|
||||
|
||||
// Merge any unmerged
|
||||
if len(toMerge) > 0 {
|
||||
versions := make([]xlMetaV2ShallowVersion, 0, len(toMerge)+1)
|
||||
xl, err := best.xlmeta()
|
||||
if err == nil {
|
||||
versions = append(versions, xl.versions...)
|
||||
}
|
||||
for _, idx := range toMerge {
|
||||
other := top[idx]
|
||||
if other == nil {
|
||||
continue
|
||||
}
|
||||
xl2, err := other.xlmeta()
|
||||
if err != nil {
|
||||
if err := selectFrom(idx); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if xl == nil {
|
||||
// Discard current "best"
|
||||
if err := selectFrom(bestIdx); err != nil {
|
||||
return err
|
||||
}
|
||||
bestIdx = idx
|
||||
best = other
|
||||
xl = xl2
|
||||
} else {
|
||||
// Mark read, unless we added it as new "best".
|
||||
if err := selectFrom(idx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
versions = append(versions, xl2.versions...)
|
||||
}
|
||||
if xl != nil && len(versions) > 0 {
|
||||
// Merge all versions. 'strict' doesn't matter since we only need one.
|
||||
xl.versions = mergeXLV2Versions(readQuorum, true, 0, versions)
|
||||
if meta, err := xl.AppendTo(metaDataPoolGet()); err == nil {
|
||||
if best.reusable {
|
||||
metaDataPoolPut(best.metadata)
|
||||
}
|
||||
best.metadata = meta
|
||||
best.cached = xl
|
||||
}
|
||||
}
|
||||
toMerge = toMerge[:0]
|
||||
}
|
||||
if best.name > last {
|
||||
select {
|
||||
case <-ctxDone:
|
||||
|
@ -299,29 +299,8 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
|
||||
}
|
||||
|
||||
// Gather results to a single channel.
|
||||
err := mergeEntryChannels(ctx, inputs, results, func(existing, other *metaCacheEntry) (replace bool) {
|
||||
// Pick object over directory
|
||||
if existing.isDir() && !other.isDir() {
|
||||
return true
|
||||
}
|
||||
if !existing.isDir() && other.isDir() {
|
||||
return false
|
||||
}
|
||||
eMeta, err := existing.xlmeta()
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
oMeta, err := other.xlmeta()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
// Replace if modtime is newer
|
||||
if !oMeta.latestModtime().Equal(oMeta.latestModtime()) {
|
||||
return oMeta.latestModtime().After(eMeta.latestModtime())
|
||||
}
|
||||
// Use NumVersions as a final tiebreaker.
|
||||
return len(oMeta.versions) > len(eMeta.versions)
|
||||
})
|
||||
// Quorum is one since we are merging across sets.
|
||||
err := mergeEntryChannels(ctx, inputs, results, 1)
|
||||
|
||||
cancelList()
|
||||
wg.Wait()
|
||||
|
Loading…
x
Reference in New Issue
Block a user