listing also match sets index for proper quorum

This commit is contained in:
Harshavardhana 2021-01-25 21:25:51 -08:00
parent fa1cd6dcce
commit 42157eb218
3 changed files with 34 additions and 22 deletions

View File

@ -957,8 +957,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
var lentry FileInfo var lentry FileInfo
var found bool var found bool
var zoneIndex = -1 var zoneIndex = -1
// TODO: following loop can be merged with above var setIndex = -1
// loop, explore this possibility.
for i, entriesValid := range zoneEntriesValid { for i, entriesValid := range zoneEntriesValid {
for j, valid := range entriesValid { for j, valid := range entriesValid {
if !valid { if !valid {
@ -968,6 +967,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
lentry = zoneEntries[i][j] lentry = zoneEntries[i][j]
found = true found = true
zoneIndex = i zoneIndex = i
setIndex = zoneEntryChs[i][j].SetIndex
continue continue
} }
str1 := zoneEntries[i][j].Name str1 := zoneEntries[i][j].Name
@ -982,6 +982,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
if str1 < str2 { if str1 < str2 {
lentry = zoneEntries[i][j] lentry = zoneEntries[i][j]
zoneIndex = i zoneIndex = i
setIndex = zoneEntryChs[i][j].SetIndex
} }
} }
} }
@ -1001,7 +1002,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
// Entries are duplicated across disks, // Entries are duplicated across disks,
// we should simply skip such entries. // we should simply skip such entries.
if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) { if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) && setIndex == zoneEntryChs[i][j].SetIndex {
lexicallySortedEntryCount++ lexicallySortedEntryCount++
continue continue
} }
@ -1050,6 +1051,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
var lentry FileInfoVersions var lentry FileInfoVersions
var found bool var found bool
var zoneIndex = -1 var zoneIndex = -1
var setIndex = -1
for i, entriesValid := range zoneEntriesValid { for i, entriesValid := range zoneEntriesValid {
for j, valid := range entriesValid { for j, valid := range entriesValid {
if !valid { if !valid {
@ -1059,6 +1061,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
lentry = zoneEntries[i][j] lentry = zoneEntries[i][j]
found = true found = true
zoneIndex = i zoneIndex = i
setIndex = zoneEntryChs[i][j].SetIndex
continue continue
} }
str1 := zoneEntries[i][j].Name str1 := zoneEntries[i][j].Name
@ -1073,6 +1076,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
if str1 < str2 { if str1 < str2 {
lentry = zoneEntries[i][j] lentry = zoneEntries[i][j]
zoneIndex = i zoneIndex = i
setIndex = zoneEntryChs[i][j].SetIndex
} }
} }
} }
@ -1092,7 +1096,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
// Entries are duplicated across disks, // Entries are duplicated across disks,
// we should simply skip such entries. // we should simply skip such entries.
if lentry.Name == zoneEntries[i][j].Name && lentry.LatestModTime.Equal(zoneEntries[i][j].LatestModTime) { if lentry.Name == zoneEntries[i][j].Name && lentry.LatestModTime.Equal(zoneEntries[i][j].LatestModTime) && setIndex == zoneEntryChs[i][j].SetIndex {
lexicallySortedEntryCount++ lexicallySortedEntryCount++
continue continue
} }

View File

@ -832,9 +832,10 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
// FileInfoVersionsCh - file info versions channel // FileInfoVersionsCh - file info versions channel
type FileInfoVersionsCh struct { type FileInfoVersionsCh struct {
Ch chan FileInfoVersions Ch chan FileInfoVersions
Prev FileInfoVersions Prev FileInfoVersions
Valid bool Valid bool
SetIndex int
} }
// Pop - pops a cached entry if any, or from the cached channel. // Pop - pops a cached entry if any, or from the cached channel.
@ -855,9 +856,10 @@ func (f *FileInfoVersionsCh) Push(fi FileInfoVersions) {
// FileInfoCh - file info channel // FileInfoCh - file info channel
type FileInfoCh struct { type FileInfoCh struct {
Ch chan FileInfo Ch chan FileInfo
Prev FileInfo Prev FileInfo
Valid bool Valid bool
SetIndex int
} }
// Pop - pops a cached entry if any, or from the cached channel. // Pop - pops a cached entry if any, or from the cached channel.
@ -884,8 +886,8 @@ func (f *FileInfoCh) Push(fi FileInfo) {
// if the caller wishes to list N entries to call lexicallySortedEntry // if the caller wishes to list N entries to call lexicallySortedEntry
// N times until this boolean is 'false'. // N times until this boolean is 'false'.
func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) { func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) {
for j := range entryChs { for i := range entryChs {
entries[j], entriesValid[j] = entryChs[j].Pop() entries[i], entriesValid[i] = entryChs[i].Pop()
} }
var isTruncated = false var isTruncated = false
@ -899,6 +901,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
var lentry FileInfoVersions var lentry FileInfoVersions
var found bool var found bool
var setIndex = -1
for i, valid := range entriesValid { for i, valid := range entriesValid {
if !valid { if !valid {
continue continue
@ -906,10 +909,12 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
if !found { if !found {
lentry = entries[i] lentry = entries[i]
found = true found = true
setIndex = i
continue continue
} }
if entries[i].Name < lentry.Name { if entries[i].Name < lentry.Name {
lentry = entries[i] lentry = entries[i]
setIndex = i
} }
} }
@ -927,7 +932,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
// Entries are duplicated across disks, // Entries are duplicated across disks,
// we should simply skip such entries. // we should simply skip such entries.
if lentry.Name == entries[i].Name && lentry.LatestModTime.Equal(entries[i].LatestModTime) { if lentry.Name == entries[i].Name && lentry.LatestModTime.Equal(entries[i].LatestModTime) && setIndex == entryChs[i].SetIndex {
lexicallySortedEntryCount++ lexicallySortedEntryCount++
continue continue
} }
@ -954,11 +959,11 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
var entryChs []FileInfoVersionsCh var entryChs []FileInfoVersionsCh
var wg sync.WaitGroup var wg sync.WaitGroup
var mutex sync.Mutex var mutex sync.Mutex
for _, set := range s.sets { for i, set := range s.sets {
// Reset for the next erasure set. // Reset for the next erasure set.
for _, disk := range set.getLoadBalancedNDisks(ndisks) { for _, disk := range set.getLoadBalancedNDisks(ndisks) {
wg.Add(1) wg.Add(1)
go func(disk StorageAPI) { go func(i int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
@ -968,10 +973,11 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
mutex.Lock() mutex.Lock()
entryChs = append(entryChs, FileInfoVersionsCh{ entryChs = append(entryChs, FileInfoVersionsCh{
Ch: entryCh, Ch: entryCh,
SetIndex: i,
}) })
mutex.Unlock() mutex.Unlock()
}(disk) }(i, disk)
} }
} }
wg.Wait() wg.Wait()
@ -984,11 +990,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
var entryChs []FileInfoCh var entryChs []FileInfoCh
var wg sync.WaitGroup var wg sync.WaitGroup
var mutex sync.Mutex var mutex sync.Mutex
for _, set := range s.sets { for i, set := range s.sets {
// Reset for the next erasure set. // Reset for the next erasure set.
for _, disk := range set.getLoadBalancedNDisks(ndisks) { for _, disk := range set.getLoadBalancedNDisks(ndisks) {
wg.Add(1) wg.Add(1)
go func(disk StorageAPI) { go func(i int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
var entryCh chan FileInfo var entryCh chan FileInfo
@ -1004,10 +1010,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
} }
mutex.Lock() mutex.Lock()
entryChs = append(entryChs, FileInfoCh{ entryChs = append(entryChs, FileInfoCh{
Ch: entryCh, Ch: entryCh,
SetIndex: i,
}) })
mutex.Unlock() mutex.Unlock()
}(disk) }(i, disk)
} }
} }
wg.Wait() wg.Wait()

View File

@ -153,7 +153,8 @@ func healErasureSet(ctx context.Context, setIndex int, setDriveCount int, maxIO
} }
mu.Lock() mu.Lock()
entryChs = append(entryChs, FileInfoVersionsCh{ entryChs = append(entryChs, FileInfoVersionsCh{
Ch: entryCh, Ch: entryCh,
SetIndex: setIndex,
}) })
mu.Unlock() mu.Unlock()
}(disk) }(disk)