fix: set scanning details locally to avoid cached values (#18092)

atomic variable results such as scanning must not use
cached values, instead rely on real-time information.
This commit is contained in:
Harshavardhana 2023-09-25 08:26:29 -07:00 committed by GitHub
parent 21e8e071d7
commit ac3a19138a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 2 deletions

View File

@ -30,6 +30,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/minio/madmin-go/v3" "github.com/minio/madmin-go/v3"
@ -134,6 +135,8 @@ func toStorageErr(err error) error {
// Abstracts a remote disk. // Abstracts a remote disk.
type storageRESTClient struct { type storageRESTClient struct {
scanning int32
endpoint Endpoint endpoint Endpoint
restClient *rest.Client restClient *rest.Client
diskID string diskID string
@ -207,6 +210,9 @@ func (client *storageRESTClient) Healing() *healingTracker {
} }
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) { func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
atomic.AddInt32(&client.scanning, 1)
defer atomic.AddInt32(&client.scanning, -1)
defer close(updates) defer close(updates)
pr, pw := io.Pipe() pr, pw := io.Pipe()
go func() { go func() {
@ -281,6 +287,8 @@ func (client *storageRESTClient) DiskInfo(_ context.Context, metrics bool) (info
// transport is already down. // transport is already down.
return info, errDiskNotFound return info, errDiskNotFound
} }
// Do not cache results from atomic variables
scanning := atomic.LoadInt32(&client.scanning) == 1
client.diskInfoCache.Once.Do(func() { client.diskInfoCache.Once.Do(func() {
client.diskInfoCache.TTL = time.Second client.diskInfoCache.TTL = time.Second
client.diskInfoCache.Update = func() (interface{}, error) { client.diskInfoCache.Update = func() (interface{}, error) {
@ -306,8 +314,9 @@ func (client *storageRESTClient) DiskInfo(_ context.Context, metrics bool) (info
}) })
val, err := client.diskInfoCache.Get() val, err := client.diskInfoCache.Get()
if val != nil { if val != nil {
return val.(DiskInfo), err info = val.(DiskInfo)
} }
info.Scanning = scanning
return info, err return info, err
} }

View File

@ -462,6 +462,7 @@ func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte,
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) { func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
atomic.AddInt32(&s.scanning, 1) atomic.AddInt32(&s.scanning, 1)
defer atomic.AddInt32(&s.scanning, -1) defer atomic.AddInt32(&s.scanning, -1)
var err error var err error
stopFn := globalScannerMetrics.log(scannerMetricScanBucketDrive, s.drivePath, cache.Info.Name) stopFn := globalScannerMetrics.log(scannerMetricScanBucketDrive, s.drivePath, cache.Info.Name)
defer func() { defer func() {
@ -650,6 +651,9 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
// DiskInfo provides current information about disk space usage, // DiskInfo provides current information about disk space usage,
// total free inodes and underlying filesystem. // total free inodes and underlying filesystem.
func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err error) { func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err error) {
// Do not cache results from atomic variables
scanning := atomic.LoadInt32(&s.scanning) == 1
s.diskInfoCache.Once.Do(func() { s.diskInfoCache.Once.Do(func() {
s.diskInfoCache.TTL = time.Second s.diskInfoCache.TTL = time.Second
s.diskInfoCache.Update = func() (interface{}, error) { s.diskInfoCache.Update = func() (interface{}, error) {
@ -677,7 +681,6 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro
// - if we found an unformatted disk (no 'format.json') // - if we found an unformatted disk (no 'format.json')
// - if we found healing tracker 'healing.bin' // - if we found healing tracker 'healing.bin'
dcinfo.Healing = errors.Is(err, errUnformattedDisk) || (s.Healing() != nil) dcinfo.Healing = errors.Is(err, errUnformattedDisk) || (s.Healing() != nil)
dcinfo.Scanning = atomic.LoadInt32(&s.scanning) == 1
dcinfo.ID = diskID dcinfo.ID = diskID
return dcinfo, err return dcinfo, err
} }
@ -687,6 +690,7 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro
if v != nil { if v != nil {
info = v.(DiskInfo) info = v.(DiskInfo)
} }
info.Scanning = scanning
return info, err return info, err
} }