fix: change timedValue to return the previously cached value (#15169)

fix: change timedvalue to return previous cached value

caller can interpret the underlying error and decide
accordingly, places where we do not interpret the
errors upon timedValue.Get() - we should simply use
the previously cached value instead of returning "empty".

Bonus: remove some unused code
This commit is contained in:
Harshavardhana 2022-06-25 08:50:16 -07:00 committed by GitHub
parent baf257adcb
commit bd099f5e71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 106 deletions

View File

@ -92,8 +92,6 @@ type erasureSets struct {
distributionAlgo string distributionAlgo string
deploymentID [16]byte deploymentID [16]byte
disksStorageInfoCache timedValue
lastConnectDisksOpTime time.Time lastConnectDisksOpTime time.Time
} }
@ -629,47 +627,6 @@ func (s *erasureSets) ParityCount() int {
return s.defaultParityCount return s.defaultParityCount
} }
// StorageUsageInfo - combines output of StorageInfo across all erasure coded object sets.
// This only returns disk usage info for ServerPools to perform placement decision, this call
// is not implemented in Object interface and is not meant to be used by other object
// layer implementations.
func (s *erasureSets) StorageUsageInfo(ctx context.Context) StorageInfo {
storageUsageInfo := func() StorageInfo {
var storageInfo StorageInfo
storageInfos := make([]StorageInfo, len(s.sets))
storageInfo.Backend.Type = madmin.Erasure
g := errgroup.WithNErrs(len(s.sets))
for index := range s.sets {
index := index
g.Go(func() error {
// ignoring errors on purpose
storageInfos[index], _ = s.sets[index].StorageInfo(ctx)
return nil
}, index)
}
// Wait for the go routines.
g.Wait()
for _, lstorageInfo := range storageInfos {
storageInfo.Disks = append(storageInfo.Disks, lstorageInfo.Disks...)
}
return storageInfo
}
s.disksStorageInfoCache.Once.Do(func() {
s.disksStorageInfoCache.TTL = time.Second
s.disksStorageInfoCache.Update = func() (interface{}, error) {
return storageUsageInfo(), nil
}
})
v, _ := s.disksStorageInfoCache.Get()
return v.(StorageInfo)
}
// StorageInfo - combines output of StorageInfo across all erasure coded object sets. // StorageInfo - combines output of StorageInfo across all erasure coded object sets.
func (s *erasureSets) StorageInfo(ctx context.Context) (StorageInfo, []error) { func (s *erasureSets) StorageInfo(ctx context.Context) (StorageInfo, []error) {
var storageInfo madmin.StorageInfo var storageInfo madmin.StorageInfo

View File

@ -264,18 +264,7 @@ func (m *Metric) copyMetric() Metric {
// once the TTL expires "read()" registered function is called // once the TTL expires "read()" registered function is called
// to return the new values and updated. // to return the new values and updated.
func (g *MetricsGroup) Get() (metrics []Metric) { func (g *MetricsGroup) Get() (metrics []Metric) {
var c interface{} c, _ := g.metricsCache.Get()
var err error
if g.cacheInterval <= 0 {
c, err = g.metricsCache.Update()
} else {
c, err = g.metricsCache.Get()
}
if err != nil {
return []Metric{}
}
m, ok := c.([]Metric) m, ok := c.([]Metric)
if !ok { if !ok {
return []Metric{} return []Metric{}

View File

@ -127,7 +127,6 @@ type storageRESTClient struct {
poolIndex, setIndex, diskIndex int poolIndex, setIndex, diskIndex int
diskInfoCache timedValue diskInfoCache timedValue
diskHealCache timedValue
} }
// Retrieve location indexes. // Retrieve location indexes.
@ -187,25 +186,9 @@ func (client *storageRESTClient) Endpoint() Endpoint {
} }
func (client *storageRESTClient) Healing() *healingTracker { func (client *storageRESTClient) Healing() *healingTracker {
client.diskHealCache.Once.Do(func() { // This call is not implemented for remote client on purpose.
// Update at least every second. // healing tracker is always for local disks.
client.diskHealCache.TTL = time.Second return nil
client.diskHealCache.Update = func() (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
b, err := client.ReadAll(ctx, minioMetaBucket,
pathJoin(bucketMetaPrefix, healingTrackerFilename))
if err != nil {
// If error, likely not healing.
return (*healingTracker)(nil), nil
}
var h healingTracker
_, err = h.UnmarshalMsg(b)
return &h, err
}
})
val, _ := client.diskHealCache.Get()
return val.(*healingTracker)
} }
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) { func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
@ -269,24 +252,6 @@ func (client *storageRESTClient) SetDiskID(id string) {
client.diskID = id client.diskID = id
} }
func (client *storageRESTClient) diskInfo() (interface{}, error) {
var info DiskInfo
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1)
if err != nil {
return info, err
}
defer xhttp.DrainBody(respBody)
if err = msgp.Decode(respBody, &info); err != nil {
return info, err
}
if info.Error != "" {
return info, toStorageErr(errors.New(info.Error))
}
return info, nil
}
// DiskInfo - fetch disk information for a remote disk. // DiskInfo - fetch disk information for a remote disk.
func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) { func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
if !client.IsOnline() { if !client.IsOnline() {
@ -299,7 +264,23 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e
} }
client.diskInfoCache.Once.Do(func() { client.diskInfoCache.Once.Do(func() {
client.diskInfoCache.TTL = time.Second client.diskInfoCache.TTL = time.Second
client.diskInfoCache.Update = client.diskInfo client.diskInfoCache.Update = func() (interface{}, error) {
var info DiskInfo
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1)
if err != nil {
return info, err
}
defer xhttp.DrainBody(respBody)
if err = msgp.Decode(respBody, &info); err != nil {
return info, err
}
if info.Error != "" {
return info, toStorageErr(errors.New(info.Error))
}
return info, nil
}
}) })
val, err := client.diskInfoCache.Get() val, err := client.diskInfoCache.Get()
if val != nil { if val != nil {

View File

@ -950,13 +950,20 @@ type timedValue struct {
// Get will return a cached value or fetch a new one. // Get will return a cached value or fetch a new one.
// If the Update function returns an error the value is forwarded as is and not cached. // If the Update function returns an error the value is forwarded as is and not cached.
func (t *timedValue) Get() (interface{}, error) { func (t *timedValue) Get() (interface{}, error) {
v := t.get() v := t.get(t.ttl())
if v != nil { if v != nil {
return v, nil return v, nil
} }
v, err := t.Update() v, err := t.Update()
if err != nil { if err != nil {
// if update fails, return current
// cached value along with error.
//
// Let the caller decide if they want
// to use the returned value based
// on error.
v = t.get(0)
return v, err return v, err
} }
@ -964,14 +971,21 @@ func (t *timedValue) Get() (interface{}, error) {
return v, nil return v, nil
} }
func (t *timedValue) get() (v interface{}) { func (t *timedValue) ttl() time.Duration {
ttl := t.TTL ttl := t.TTL
if ttl <= 0 { if ttl <= 0 {
ttl = time.Second ttl = time.Second
} }
return ttl
}
func (t *timedValue) get(ttl time.Duration) (v interface{}) {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
v = t.value v = t.value
if ttl <= 0 {
return v
}
if time.Since(t.lastUpdate) < ttl { if time.Since(t.lastUpdate) < ttl {
return v return v
} }

View File

@ -575,22 +575,19 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
dcinfo.FreeInodes = di.Ffree dcinfo.FreeInodes = di.Ffree
dcinfo.FSType = di.FSType dcinfo.FSType = di.FSType
diskID, err := s.GetDiskID() diskID, err := s.GetDiskID()
if errors.Is(err, errUnformattedDisk) { // Healing is 'true' when
// if we found an unformatted disk then // - if we found an unformatted disk (no 'format.json')
// healing is automatically true. // - if we found healing tracker 'healing.bin'
dcinfo.Healing = true dcinfo.Healing = errors.Is(err, errUnformattedDisk) || (s.Healing() != nil)
} else {
// Check if the disk is being healed if GetDiskID
// returned any error other than fresh disk
dcinfo.Healing = s.Healing() != nil
}
dcinfo.ID = diskID dcinfo.ID = diskID
return dcinfo, err return dcinfo, err
} }
}) })
v, err := s.diskInfoCache.Get() v, err := s.diskInfoCache.Get()
info = v.(DiskInfo) if v != nil {
info = v.(DiskInfo)
}
return info, err return info, err
} }