avoid passing around poolIdx, setIdx instead pass the relevant disks (#17660)

This commit is contained in:
Harshavardhana 2023-07-17 09:52:05 -07:00 committed by GitHub
parent 9b5c2c386a
commit 24e86d0c59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 30 deletions

View File

@ -485,7 +485,12 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
}
// Remove .healing.bin from all disks with similar heal-id
for _, disk := range z.serverPools[poolIdx].sets[setIdx].getDisks() {
disks, err := z.GetDisks(poolIdx, setIdx)
if err != nil {
return err
}
for _, disk := range disks {
t, err := loadHealingTracker(ctx, disk)
if err != nil {
if !errors.Is(err, errFileNotFound) {

View File

@ -33,7 +33,6 @@ import (
"sync"
"time"
"github.com/dustin/go-humanize"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
@ -302,7 +301,7 @@ type folderScanner struct {
// The returned cache will always be valid, but may not be updated from the existing.
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
// If the supplied context is canceled the function will return at the first chance.
func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) {
func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) {
switch cache.Info.Name {
case "", dataUsageRoot:
return cache, errors.New("internal error: root scan attempted")
@ -321,20 +320,8 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
scanMode: scanMode,
updates: cache.Info.updates,
updateCurrentPath: updatePath,
}
// Add disks for set healing.
if poolIdx >= 0 && setIdx >= 0 {
objAPI, ok := newObjectLayerFn().(*erasureServerPools)
if ok {
if poolIdx < len(objAPI.serverPools) && setIdx < len(objAPI.serverPools[poolIdx].sets) {
// Pass the disks belonging to the set.
s.disks = objAPI.serverPools[poolIdx].sets[setIdx].getDisks()
s.disksQuorum = len(s.disks) / 2
} else {
logger.LogIf(ctx, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1)))
}
}
disks: disks,
disksQuorum: len(disks) / 2,
}
// Enable healing in XL mode.
@ -649,8 +636,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
break
}
objAPI, ok := newObjectLayerFn().(*erasureServerPools)
if !ok || len(f.disks) == 0 || f.disksQuorum == 0 {
if len(f.disks) == 0 || f.disksQuorum == 0 {
break
}
@ -688,7 +674,9 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
// Bucket might be missing as well with abandoned children.
// make sure it is created first otherwise healing won't proceed
// for objects.
_, _ = objAPI.HealBucket(ctx, bucket, madmin.HealOpts{})
bgSeq.queueHealTask(healSource{
bucket: bucket,
}, madmin.HealItemBucket)
}
resolver.bucket = bucket

View File

@ -62,7 +62,7 @@ func TestDataUsageUpdate(t *testing.T) {
return
}
got, err := scanDataFolder(context.Background(), 0, 0, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0)
got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0)
if err != nil {
t.Fatal(err)
}
@ -173,7 +173,7 @@ func TestDataUsageUpdate(t *testing.T) {
}
// Changed dir must be picked up in this many cycles.
for i := 0; i < dataUsageUpdateDirCycles; i++ {
got, err = scanDataFolder(context.Background(), 0, 0, base, got, getSize, 0)
got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0)
got.Info.NextCycle++
if err != nil {
t.Fatal(err)
@ -280,7 +280,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
}
return
}
got, err := scanDataFolder(context.Background(), 0, 0, base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0)
got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0)
if err != nil {
t.Fatal(err)
}
@ -414,7 +414,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
}
// Changed dir must be picked up in this many cycles.
for i := 0; i < dataUsageUpdateDirCycles; i++ {
got, err = scanDataFolder(context.Background(), 0, 0, base, got, getSize, 0)
got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0)
got.Info.NextCycle++
if err != nil {
t.Fatal(err)
@ -562,7 +562,7 @@ func TestDataUsageCacheSerialize(t *testing.T) {
}
return
}
want, err := scanDataFolder(context.Background(), 0, 0, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0)
want, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0)
if err != nil {
t.Fatal(err)
}

View File

@ -233,7 +233,12 @@ func TestListOnlineDisks(t *testing.T) {
object := "object"
data := bytes.Repeat([]byte("a"), smallFileThreshold*16)
z := obj.(*erasureServerPools)
erasureDisks := z.serverPools[0].sets[0].getDisks()
erasureDisks, err := z.GetDisks(0, 0)
if err != nil {
t.Fatal(err)
}
for i, test := range testCases {
test := test
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
@ -405,7 +410,12 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
object := "object"
data := bytes.Repeat([]byte("a"), smallFileThreshold/2)
z := obj.(*erasureServerPools)
erasureDisks := z.serverPools[0].sets[0].getDisks()
erasureDisks, err := z.GetDisks(0, 0)
if err != nil {
t.Fatal(err)
}
for i, test := range testCases {
test := test
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {

View File

@ -485,11 +485,16 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
return cache, errServerNotInitialized
}
cache.Info.updates = updates
poolIdx, setIdx, _ := s.GetDiskLoc()
dataUsageInfo, err := scanDataFolder(ctx, poolIdx, setIdx, s.diskPath, cache, func(item scannerItem) (sizeSummary, error) {
disks, err := objAPI.GetDisks(poolIdx, setIdx)
if err != nil {
return cache, err
}
cache.Info.updates = updates
dataUsageInfo, err := scanDataFolder(ctx, disks, s.diskPath, cache, func(item scannerItem) (sizeSummary, error) {
// Look for `xl.meta/xl.json' at the leaf.
if !strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFile) &&
!strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFileV1) {