From 42157eb2185fc674e077a8daa69a44d1e848601d Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 25 Jan 2021 21:25:51 -0800 Subject: [PATCH] listing also match sets index for proper quorum --- cmd/erasure-server-sets.go | 12 +++++++---- cmd/erasure-sets.go | 41 ++++++++++++++++++++++---------------- cmd/global-heal.go | 3 ++- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/cmd/erasure-server-sets.go b/cmd/erasure-server-sets.go index 35e6d593d..4c199f068 100644 --- a/cmd/erasure-server-sets.go +++ b/cmd/erasure-server-sets.go @@ -957,8 +957,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI var lentry FileInfo var found bool var zoneIndex = -1 - // TODO: following loop can be merged with above - // loop, explore this possibility. + var setIndex = -1 for i, entriesValid := range zoneEntriesValid { for j, valid := range entriesValid { if !valid { @@ -968,6 +967,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI lentry = zoneEntries[i][j] found = true zoneIndex = i + setIndex = zoneEntryChs[i][j].SetIndex continue } str1 := zoneEntries[i][j].Name @@ -982,6 +982,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI if str1 < str2 { lentry = zoneEntries[i][j] zoneIndex = i + setIndex = zoneEntryChs[i][j].SetIndex } } } @@ -1001,7 +1002,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI // Entries are duplicated across disks, // we should simply skip such entries. - if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) { + if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) && setIndex == zoneEntryChs[i][j].SetIndex { lexicallySortedEntryCount++ continue } @@ -1050,6 +1051,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE var lentry FileInfoVersions var found bool var zoneIndex = -1 + var setIndex = -1 for i, entriesValid := range zoneEntriesValid { for j, valid := range entriesValid { if !valid { @@ -1059,6 +1061,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE lentry = zoneEntries[i][j] found = true zoneIndex = i + setIndex = zoneEntryChs[i][j].SetIndex continue } str1 := zoneEntries[i][j].Name @@ -1073,6 +1076,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE if str1 < str2 { lentry = zoneEntries[i][j] zoneIndex = i + setIndex = zoneEntryChs[i][j].SetIndex } } } @@ -1092,7 +1096,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE // Entries are duplicated across disks, // we should simply skip such entries. - if lentry.Name == zoneEntries[i][j].Name && lentry.LatestModTime.Equal(zoneEntries[i][j].LatestModTime) { + if lentry.Name == zoneEntries[i][j].Name && lentry.LatestModTime.Equal(zoneEntries[i][j].LatestModTime) && setIndex == zoneEntryChs[i][j].SetIndex { lexicallySortedEntryCount++ continue } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 02283ea13..e1c508d2e 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -832,9 +832,10 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB // FileInfoVersionsCh - file info versions channel type FileInfoVersionsCh struct { - Ch chan FileInfoVersions - Prev FileInfoVersions - Valid bool + Ch chan FileInfoVersions + Prev FileInfoVersions + Valid bool + SetIndex int } // Pop - pops a cached entry if any, or from the cached channel. @@ -855,9 +856,10 @@ func (f *FileInfoVersionsCh) Push(fi FileInfoVersions) { // FileInfoCh - file info channel type FileInfoCh struct { - Ch chan FileInfo - Prev FileInfo - Valid bool + Ch chan FileInfo + Prev FileInfo + Valid bool + SetIndex int } // Pop - pops a cached entry if any, or from the cached channel. @@ -884,8 +886,8 @@ func (f *FileInfoCh) Push(fi FileInfo) { // if the caller wishes to list N entries to call lexicallySortedEntry // N times until this boolean is 'false'. func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) { - for j := range entryChs { - entries[j], entriesValid[j] = entryChs[j].Pop() + for i := range entryChs { + entries[i], entriesValid[i] = entryChs[i].Pop() } var isTruncated = false @@ -899,6 +901,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI var lentry FileInfoVersions var found bool + var setIndex = -1 for i, valid := range entriesValid { if !valid { continue @@ -906,10 +909,12 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI if !found { lentry = entries[i] found = true + setIndex = i continue } if entries[i].Name < lentry.Name { lentry = entries[i] + setIndex = i } } @@ -927,7 +932,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI // Entries are duplicated across disks, // we should simply skip such entries. - if lentry.Name == entries[i].Name && lentry.LatestModTime.Equal(entries[i].LatestModTime) { + if lentry.Name == entries[i].Name && lentry.LatestModTime.Equal(entries[i].LatestModTime) && setIndex == entryChs[i].SetIndex { lexicallySortedEntryCount++ continue } @@ -954,11 +959,11 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref var entryChs []FileInfoVersionsCh var wg sync.WaitGroup var mutex sync.Mutex - for _, set := range s.sets { + for i, set := range s.sets { // Reset for the next erasure set. for _, disk := range set.getLoadBalancedNDisks(ndisks) { wg.Add(1) - go func(disk StorageAPI) { + go func(i int, disk StorageAPI) { defer wg.Done() entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) @@ -968,10 +973,11 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref mutex.Lock() entryChs = append(entryChs, FileInfoVersionsCh{ - Ch: entryCh, + Ch: entryCh, + SetIndex: i, }) mutex.Unlock() - }(disk) + }(i, disk) } } wg.Wait() @@ -984,11 +990,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark var entryChs []FileInfoCh var wg sync.WaitGroup var mutex sync.Mutex - for _, set := range s.sets { + for i, set := range s.sets { // Reset for the next erasure set. for _, disk := range set.getLoadBalancedNDisks(ndisks) { wg.Add(1) - go func(disk StorageAPI) { + go func(i int, disk StorageAPI) { defer wg.Done() var entryCh chan FileInfo @@ -1004,10 +1010,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark } mutex.Lock() entryChs = append(entryChs, FileInfoCh{ - Ch: entryCh, + Ch: entryCh, + SetIndex: i, }) mutex.Unlock() - }(disk) + }(i, disk) } } wg.Wait() diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 03d91c2c1..380d605cf 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -153,7 +153,8 @@ func healErasureSet(ctx context.Context, setIndex int, setDriveCount int, maxIO } mu.Lock() entryChs = append(entryChs, FileInfoVersionsCh{ - Ch: entryCh, + Ch: entryCh, + SetIndex: setIndex, }) mu.Unlock() }(disk)