mirror of
https://github.com/minio/minio.git
synced 2025-04-24 04:10:43 -04:00
allow caller context during reloads() to cancel (#19687)
canceled callers might linger around longer, can potentially overwhelm the system. Instead provider a caller context and canceled callers don't hold on to them. Bonus: we have no reason to cache errors, we should never cache errors otherwise we can potentially have quorum errors creeping in unexpectedly. We should let the cache when invalidating hit the actual resources instead.
This commit is contained in:
parent
67bd71b7a5
commit
9a267f9270
@ -1217,9 +1217,9 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
}
|
}
|
||||||
|
|
||||||
bucketStorageCache.InitOnce(10*time.Second,
|
bucketStorageCache.InitOnce(10*time.Second,
|
||||||
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
cachevalue.Opts{ReturnLastGood: true},
|
||||||
func() (DataUsageInfo, error) {
|
func(ctx context.Context) (DataUsageInfo, error) {
|
||||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, done := context.WithTimeout(ctx, 2*time.Second)
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
return loadDataUsageFromBackend(ctx, objectAPI)
|
return loadDataUsageFromBackend(ctx, objectAPI)
|
||||||
|
@ -49,8 +49,8 @@ var bucketStorageCache = cachevalue.New[DataUsageInfo]()
|
|||||||
func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||||
bucketStorageCache.InitOnce(10*time.Second,
|
bucketStorageCache.InitOnce(10*time.Second,
|
||||||
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||||
func() (DataUsageInfo, error) {
|
func(ctx context.Context) (DataUsageInfo, error) {
|
||||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, done := context.WithTimeout(ctx, 2*time.Second)
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
return loadDataUsageFromBackend(ctx, objAPI)
|
return loadDataUsageFromBackend(ctx, objAPI)
|
||||||
@ -59,8 +59,8 @@ func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetBucketUsageInfo return bucket usage info for a given bucket
|
// GetBucketUsageInfo return bucket usage info for a given bucket
|
||||||
func (sys *BucketQuotaSys) GetBucketUsageInfo(bucket string) (BucketUsageInfo, error) {
|
func (sys *BucketQuotaSys) GetBucketUsageInfo(ctx context.Context, bucket string) (BucketUsageInfo, error) {
|
||||||
dui, err := bucketStorageCache.Get()
|
dui, err := bucketStorageCache.GetWithCtx(ctx)
|
||||||
timedout := OperationTimedOut{}
|
timedout := OperationTimedOut{}
|
||||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &timedout) {
|
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &timedout) {
|
||||||
if len(dui.BucketsUsage) > 0 {
|
if len(dui.BucketsUsage) > 0 {
|
||||||
@ -118,7 +118,7 @@ func (sys *BucketQuotaSys) enforceQuotaHard(ctx context.Context, bucket string,
|
|||||||
return BucketQuotaExceeded{Bucket: bucket}
|
return BucketQuotaExceeded{Bucket: bucket}
|
||||||
}
|
}
|
||||||
|
|
||||||
bui, err := sys.GetBucketUsageInfo(bucket)
|
bui, err := sys.GetBucketUsageInfo(ctx, bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -79,12 +79,12 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
|||||||
prefixUsageCache.InitOnce(30*time.Second,
|
prefixUsageCache.InitOnce(30*time.Second,
|
||||||
// No need to fail upon Update() error, fallback to old value.
|
// No need to fail upon Update() error, fallback to old value.
|
||||||
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||||
func() (map[string]uint64, error) {
|
func(ctx context.Context) (map[string]uint64, error) {
|
||||||
m := make(map[string]uint64)
|
m := make(map[string]uint64)
|
||||||
for _, pool := range z.serverPools {
|
for _, pool := range z.serverPools {
|
||||||
for _, er := range pool.sets {
|
for _, er := range pool.sets {
|
||||||
// Load bucket usage prefixes
|
// Load bucket usage prefixes
|
||||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, done := context.WithTimeout(ctx, 2*time.Second)
|
||||||
ok := cache.load(ctx, er, bucket+slashSeparator+dataUsageCacheName) == nil
|
ok := cache.load(ctx, er, bucket+slashSeparator+dataUsageCacheName) == nil
|
||||||
done()
|
done()
|
||||||
if ok {
|
if ok {
|
||||||
@ -107,7 +107,7 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
return prefixUsageCache.Get()
|
return prefixUsageCache.GetWithCtx(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
|
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
|
||||||
|
@ -1962,8 +1962,8 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
|
|||||||
if opts.Cached {
|
if opts.Cached {
|
||||||
listBucketsCache.InitOnce(time.Second,
|
listBucketsCache.InitOnce(time.Second,
|
||||||
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||||
func() ([]BucketInfo, error) {
|
func(ctx context.Context) ([]BucketInfo, error) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
||||||
@ -1980,7 +1980,7 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
return listBucketsCache.Get()
|
return listBucketsCache.GetWithCtx(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
||||||
|
@ -361,7 +361,7 @@ type MetricsGroupOpts struct {
|
|||||||
func (g *MetricsGroupV2) RegisterRead(read func(context.Context) []MetricV2) {
|
func (g *MetricsGroupV2) RegisterRead(read func(context.Context) []MetricV2) {
|
||||||
g.metricsCache = cachevalue.NewFromFunc(g.cacheInterval,
|
g.metricsCache = cachevalue.NewFromFunc(g.cacheInterval,
|
||||||
cachevalue.Opts{ReturnLastGood: true},
|
cachevalue.Opts{ReturnLastGood: true},
|
||||||
func() ([]MetricV2, error) {
|
func(ctx context.Context) ([]MetricV2, error) {
|
||||||
if g.metricsGroupOpts.dependGlobalObjectAPI {
|
if g.metricsGroupOpts.dependGlobalObjectAPI {
|
||||||
objLayer := newObjectLayerFn()
|
objLayer := newObjectLayerFn()
|
||||||
// Service not initialized yet
|
// Service not initialized yet
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -57,7 +58,7 @@ type nodesOnline struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newNodesUpDownCache() *cachevalue.Cache[nodesOnline] {
|
func newNodesUpDownCache() *cachevalue.Cache[nodesOnline] {
|
||||||
loadNodesUpDown := func() (v nodesOnline, err error) {
|
loadNodesUpDown := func(ctx context.Context) (v nodesOnline, err error) {
|
||||||
v.Online, v.Offline = globalNotificationSys.GetPeerOnlineCount()
|
v.Online, v.Offline = globalNotificationSys.GetPeerOnlineCount()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -84,7 +85,7 @@ type storageMetrics struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newDataUsageInfoCache() *cachevalue.Cache[DataUsageInfo] {
|
func newDataUsageInfoCache() *cachevalue.Cache[DataUsageInfo] {
|
||||||
loadDataUsage := func() (u DataUsageInfo, err error) {
|
loadDataUsage := func(ctx context.Context) (u DataUsageInfo, err error) {
|
||||||
objLayer := newObjectLayerFn()
|
objLayer := newObjectLayerFn()
|
||||||
if objLayer == nil {
|
if objLayer == nil {
|
||||||
return
|
return
|
||||||
@ -100,7 +101,7 @@ func newDataUsageInfoCache() *cachevalue.Cache[DataUsageInfo] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newESetHealthResultCache() *cachevalue.Cache[HealthResult] {
|
func newESetHealthResultCache() *cachevalue.Cache[HealthResult] {
|
||||||
loadHealth := func() (r HealthResult, err error) {
|
loadHealth := func(ctx context.Context) (r HealthResult, err error) {
|
||||||
objLayer := newObjectLayerFn()
|
objLayer := newObjectLayerFn()
|
||||||
if objLayer == nil {
|
if objLayer == nil {
|
||||||
return
|
return
|
||||||
@ -157,7 +158,7 @@ func newDriveMetricsCache() *cachevalue.Cache[storageMetrics] {
|
|||||||
prevDriveIOStatsRefreshedAt time.Time
|
prevDriveIOStatsRefreshedAt time.Time
|
||||||
)
|
)
|
||||||
|
|
||||||
loadDriveMetrics := func() (v storageMetrics, err error) {
|
loadDriveMetrics := func(ctx context.Context) (v storageMetrics, err error) {
|
||||||
objLayer := newObjectLayerFn()
|
objLayer := newObjectLayerFn()
|
||||||
if objLayer == nil {
|
if objLayer == nil {
|
||||||
return
|
return
|
||||||
@ -203,7 +204,7 @@ func newDriveMetricsCache() *cachevalue.Cache[storageMetrics] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newCPUMetricsCache() *cachevalue.Cache[madmin.CPUMetrics] {
|
func newCPUMetricsCache() *cachevalue.Cache[madmin.CPUMetrics] {
|
||||||
loadCPUMetrics := func() (v madmin.CPUMetrics, err error) {
|
loadCPUMetrics := func(ctx context.Context) (v madmin.CPUMetrics, err error) {
|
||||||
var types madmin.MetricType = madmin.MetricsCPU
|
var types madmin.MetricType = madmin.MetricsCPU
|
||||||
|
|
||||||
m := collectLocalMetrics(types, collectMetricsOpts{
|
m := collectLocalMetrics(types, collectMetricsOpts{
|
||||||
@ -228,7 +229,7 @@ func newCPUMetricsCache() *cachevalue.Cache[madmin.CPUMetrics] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newMemoryMetricsCache() *cachevalue.Cache[madmin.MemInfo] {
|
func newMemoryMetricsCache() *cachevalue.Cache[madmin.MemInfo] {
|
||||||
loadMemoryMetrics := func() (v madmin.MemInfo, err error) {
|
loadMemoryMetrics := func(ctx context.Context) (v madmin.MemInfo, err error) {
|
||||||
var types madmin.MetricType = madmin.MetricsMem
|
var types madmin.MetricType = madmin.MetricsMem
|
||||||
|
|
||||||
m := collectLocalMetrics(types, collectMetricsOpts{
|
m := collectLocalMetrics(types, collectMetricsOpts{
|
||||||
@ -253,7 +254,7 @@ func newMemoryMetricsCache() *cachevalue.Cache[madmin.MemInfo] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newClusterStorageInfoCache() *cachevalue.Cache[storageMetrics] {
|
func newClusterStorageInfoCache() *cachevalue.Cache[storageMetrics] {
|
||||||
loadStorageInfo := func() (v storageMetrics, err error) {
|
loadStorageInfo := func(ctx context.Context) (v storageMetrics, err error) {
|
||||||
objLayer := newObjectLayerFn()
|
objLayer := newObjectLayerFn()
|
||||||
if objLayer == nil {
|
if objLayer == nil {
|
||||||
return storageMetrics{}, nil
|
return storageMetrics{}, nil
|
||||||
|
@ -116,7 +116,7 @@ func (client *peerRESTClient) call(method string, values url.Values, body io.Rea
|
|||||||
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
||||||
// after verifying format.json
|
// after verifying format.json
|
||||||
func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||||
if client == nil || !client.IsOnline() {
|
if client == nil {
|
||||||
return nil, errPeerNotReachable
|
return nil, errPeerNotReachable
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,6 +129,10 @@ func (client *peerRESTClient) callWithContext(ctx context.Context, method string
|
|||||||
return respBody, nil
|
return respBody, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if xnet.IsNetworkOrHostDown(err, true) {
|
||||||
|
return nil, errPeerNotReachable
|
||||||
|
}
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,7 +143,11 @@ func (client *peerRESTClient) String() string {
|
|||||||
|
|
||||||
// IsOnline returns true if the peer client is online.
|
// IsOnline returns true if the peer client is online.
|
||||||
func (client *peerRESTClient) IsOnline() bool {
|
func (client *peerRESTClient) IsOnline() bool {
|
||||||
return client.restClient.IsOnline()
|
conn := client.gridConn()
|
||||||
|
if conn == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return client.restClient.IsOnline() || conn.State() == grid.StateConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close - marks the client as closed.
|
// Close - marks the client as closed.
|
||||||
|
@ -191,8 +191,13 @@ func (client *storageRESTClient) String() string {
|
|||||||
return client.endpoint.String()
|
return client.endpoint.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsOnline - returns whether RPC client failed to connect or not.
|
// IsOnline - returns whether client failed to connect or not.
|
||||||
func (client *storageRESTClient) IsOnline() bool {
|
func (client *storageRESTClient) IsOnline() bool {
|
||||||
|
return client.restClient.IsOnline() || client.IsOnlineWS()
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsOnlineWS - returns whether websocket client failed to connect or not.
|
||||||
|
func (client *storageRESTClient) IsOnlineWS() bool {
|
||||||
return client.gridConn.State() == grid.StateConnected
|
return client.gridConn.State() == grid.StateConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,7 +259,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (client *storageRESTClient) GetDiskID() (string, error) {
|
func (client *storageRESTClient) GetDiskID() (string, error) {
|
||||||
if !client.IsOnline() {
|
if !client.IsOnlineWS() {
|
||||||
// make sure to check if the disk is offline, since the underlying
|
// make sure to check if the disk is offline, since the underlying
|
||||||
// value is cached we should attempt to invalidate it if such calls
|
// value is cached we should attempt to invalidate it if such calls
|
||||||
// were attempted. This can lead to false success under certain conditions
|
// were attempted. This can lead to false success under certain conditions
|
||||||
@ -275,7 +280,7 @@ func (client *storageRESTClient) SetDiskID(id string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOptions) (info DiskInfo, err error) {
|
func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOptions) (info DiskInfo, err error) {
|
||||||
if !client.IsOnline() {
|
if !client.IsOnlineWS() {
|
||||||
// make sure to check if the disk is offline, since the underlying
|
// make sure to check if the disk is offline, since the underlying
|
||||||
// value is cached we should attempt to invalidate it if such calls
|
// value is cached we should attempt to invalidate it if such calls
|
||||||
// were attempted. This can lead to false success under certain conditions
|
// were attempted. This can lead to false success under certain conditions
|
||||||
@ -302,10 +307,9 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
|||||||
return info, nil
|
return info, nil
|
||||||
} // In all other cases cache the value upto 1sec.
|
} // In all other cases cache the value upto 1sec.
|
||||||
|
|
||||||
client.diskInfoCache.InitOnce(time.Second,
|
client.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{},
|
||||||
cachevalue.Opts{CacheError: true},
|
func(ctx context.Context) (info DiskInfo, err error) {
|
||||||
func() (info DiskInfo, err error) {
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
nopts := DiskInfoOptions{DiskID: *client.diskID.Load(), Metrics: true}
|
nopts := DiskInfoOptions{DiskID: *client.diskID.Load(), Metrics: true}
|
||||||
@ -321,7 +325,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
return client.diskInfoCache.Get()
|
return client.diskInfoCache.GetWithCtx(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MakeVolBulk - create multiple volumes in a bulk operation.
|
// MakeVolBulk - create multiple volumes in a bulk operation.
|
||||||
|
@ -156,7 +156,7 @@ func veeamSOSAPIGetObject(ctx context.Context, bucket, object string, rs *HTTPRa
|
|||||||
}
|
}
|
||||||
|
|
||||||
q, _ := globalBucketQuotaSys.Get(ctx, bucket)
|
q, _ := globalBucketQuotaSys.Get(ctx, bucket)
|
||||||
binfo, _ := globalBucketQuotaSys.GetBucketUsageInfo(bucket)
|
binfo, _ := globalBucketQuotaSys.GetBucketUsageInfo(ctx, bucket)
|
||||||
|
|
||||||
ci := capacityInfo{
|
ci := capacityInfo{
|
||||||
Used: int64(binfo.Size),
|
Used: int64(binfo.Size),
|
||||||
|
@ -99,7 +99,7 @@ type xlStorageDiskIDCheck struct {
|
|||||||
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
||||||
p.metricsCache.InitOnce(5*time.Second,
|
p.metricsCache.InitOnce(5*time.Second,
|
||||||
cachevalue.Opts{},
|
cachevalue.Opts{},
|
||||||
func() (DiskMetrics, error) {
|
func(ctx context.Context) (DiskMetrics, error) {
|
||||||
diskMetric := DiskMetrics{
|
diskMetric := DiskMetrics{
|
||||||
LastMinute: make(map[string]AccElem, len(p.apiLatencies)),
|
LastMinute: make(map[string]AccElem, len(p.apiLatencies)),
|
||||||
APICalls: make(map[string]uint64, len(p.apiCalls)),
|
APICalls: make(map[string]uint64, len(p.apiCalls)),
|
||||||
@ -114,7 +114,7 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
diskMetric, _ := p.metricsCache.Get()
|
diskMetric, _ := p.metricsCache.GetWithCtx(context.Background())
|
||||||
// Do not need this value to be cached.
|
// Do not need this value to be cached.
|
||||||
diskMetric.TotalErrorsTimeout = p.totalErrsTimeout.Load()
|
diskMetric.TotalErrorsTimeout = p.totalErrsTimeout.Load()
|
||||||
diskMetric.TotalErrorsAvailability = p.totalErrsAvailability.Load()
|
diskMetric.TotalErrorsAvailability = p.totalErrsAvailability.Load()
|
||||||
|
@ -331,7 +331,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
|
|||||||
|
|
||||||
// Initialize DiskInfo cache
|
// Initialize DiskInfo cache
|
||||||
s.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{},
|
s.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{},
|
||||||
func() (DiskInfo, error) {
|
func(ctx context.Context) (DiskInfo, error) {
|
||||||
dcinfo := DiskInfo{}
|
dcinfo := DiskInfo{}
|
||||||
di, err := getDiskInfo(s.drivePath)
|
di, err := getDiskInfo(s.drivePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -752,8 +752,8 @@ func (s *xlStorage) setWriteAttribute(writeCount uint64) error {
|
|||||||
|
|
||||||
// DiskInfo provides current information about disk space usage,
|
// DiskInfo provides current information about disk space usage,
|
||||||
// total free inodes and underlying filesystem.
|
// total free inodes and underlying filesystem.
|
||||||
func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInfo, err error) {
|
func (s *xlStorage) DiskInfo(ctx context.Context, _ DiskInfoOptions) (info DiskInfo, err error) {
|
||||||
info, err = s.diskInfoCache.Get()
|
info, err = s.diskInfoCache.GetWithCtx(ctx)
|
||||||
info.NRRequests = s.nrRequests
|
info.NRRequests = s.nrRequests
|
||||||
info.Rotational = s.rotational
|
info.Rotational = s.rotational
|
||||||
info.MountPath = s.drivePath
|
info.MountPath = s.drivePath
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package cachevalue
|
package cachevalue
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -30,11 +31,6 @@ type Opts struct {
|
|||||||
// Returns the last good value AND the error.
|
// Returns the last good value AND the error.
|
||||||
ReturnLastGood bool
|
ReturnLastGood bool
|
||||||
|
|
||||||
// If CacheError is set, errors will be cached as well
|
|
||||||
// and not continuously try to update.
|
|
||||||
// Should not be combined with ReturnLastGood.
|
|
||||||
CacheError bool
|
|
||||||
|
|
||||||
// If NoWait is set, Get() will return the last good value,
|
// If NoWait is set, Get() will return the last good value,
|
||||||
// if TTL has expired but 2x TTL has not yet passed,
|
// if TTL has expired but 2x TTL has not yet passed,
|
||||||
// but will fetch a new value in the background.
|
// but will fetch a new value in the background.
|
||||||
@ -50,7 +46,7 @@ type Cache[T any] struct {
|
|||||||
// Only one caller will call this function at any time, others will be blocking.
|
// Only one caller will call this function at any time, others will be blocking.
|
||||||
// The returned value can no longer be modified once returned.
|
// The returned value can no longer be modified once returned.
|
||||||
// Should be set before calling Get().
|
// Should be set before calling Get().
|
||||||
updateFn func() (T, error)
|
updateFn func(ctx context.Context) (T, error)
|
||||||
|
|
||||||
// ttl for a cached value.
|
// ttl for a cached value.
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
@ -62,10 +58,7 @@ type Cache[T any] struct {
|
|||||||
Once sync.Once
|
Once sync.Once
|
||||||
|
|
||||||
// Managed values.
|
// Managed values.
|
||||||
valErr atomic.Pointer[struct {
|
val atomic.Pointer[T]
|
||||||
v T
|
|
||||||
e error
|
|
||||||
}]
|
|
||||||
lastUpdateMs atomic.Int64
|
lastUpdateMs atomic.Int64
|
||||||
updating sync.Mutex
|
updating sync.Mutex
|
||||||
}
|
}
|
||||||
@ -78,7 +71,7 @@ func New[T any]() *Cache[T] {
|
|||||||
|
|
||||||
// NewFromFunc allocates a new cached value instance and initializes it with an
|
// NewFromFunc allocates a new cached value instance and initializes it with an
|
||||||
// update function, making it ready for use.
|
// update function, making it ready for use.
|
||||||
func NewFromFunc[T any](ttl time.Duration, opts Opts, update func() (T, error)) *Cache[T] {
|
func NewFromFunc[T any](ttl time.Duration, opts Opts, update func(ctx context.Context) (T, error)) *Cache[T] {
|
||||||
return &Cache[T]{
|
return &Cache[T]{
|
||||||
ttl: ttl,
|
ttl: ttl,
|
||||||
updateFn: update,
|
updateFn: update,
|
||||||
@ -88,7 +81,7 @@ func NewFromFunc[T any](ttl time.Duration, opts Opts, update func() (T, error))
|
|||||||
|
|
||||||
// InitOnce initializes the cache with a TTL and an update function. It is
|
// InitOnce initializes the cache with a TTL and an update function. It is
|
||||||
// guaranteed to be called only once.
|
// guaranteed to be called only once.
|
||||||
func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func() (T, error)) {
|
func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func(ctx context.Context) (T, error)) {
|
||||||
t.Once.Do(func() {
|
t.Once.Do(func() {
|
||||||
t.ttl = ttl
|
t.ttl = ttl
|
||||||
t.updateFn = update
|
t.updateFn = update
|
||||||
@ -96,61 +89,68 @@ func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func() (T, erro
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get will return a cached value or fetch a new one.
|
// GetWithCtx will return a cached value or fetch a new one.
|
||||||
|
// passes a caller context, if caller context cancels nothing
|
||||||
|
// is cached.
|
||||||
// Tf the Update function returns an error the value is forwarded as is and not cached.
|
// Tf the Update function returns an error the value is forwarded as is and not cached.
|
||||||
func (t *Cache[T]) Get() (T, error) {
|
func (t *Cache[T]) GetWithCtx(ctx context.Context) (T, error) {
|
||||||
v := t.valErr.Load()
|
v := t.val.Load()
|
||||||
ttl := t.ttl
|
ttl := t.ttl
|
||||||
vTime := t.lastUpdateMs.Load()
|
vTime := t.lastUpdateMs.Load()
|
||||||
tNow := time.Now().UnixMilli()
|
tNow := time.Now().UnixMilli()
|
||||||
if v != nil && tNow-vTime < ttl.Milliseconds() {
|
if v != nil && tNow-vTime < ttl.Milliseconds() {
|
||||||
if v.e == nil {
|
return *v, nil
|
||||||
return v.v, nil
|
|
||||||
}
|
|
||||||
if v.e != nil && t.opts.CacheError || t.opts.ReturnLastGood {
|
|
||||||
return v.v, v.e
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch new value.
|
// Fetch new value asynchronously, while we do not return an error
|
||||||
if t.opts.NoWait && v != nil && tNow-vTime < ttl.Milliseconds()*2 && (v.e == nil || t.opts.CacheError) {
|
// if v != nil value or
|
||||||
|
if t.opts.NoWait && v != nil && tNow-vTime < ttl.Milliseconds()*2 {
|
||||||
if t.updating.TryLock() {
|
if t.updating.TryLock() {
|
||||||
go func() {
|
go func() {
|
||||||
defer t.updating.Unlock()
|
defer t.updating.Unlock()
|
||||||
t.update()
|
t.update(context.Background())
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
return v.v, v.e
|
return *v, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get lock. Either we get it or we wait for it.
|
// Get lock. Either we get it or we wait for it.
|
||||||
t.updating.Lock()
|
t.updating.Lock()
|
||||||
|
defer t.updating.Unlock()
|
||||||
|
|
||||||
if time.Since(time.UnixMilli(t.lastUpdateMs.Load())) < ttl {
|
if time.Since(time.UnixMilli(t.lastUpdateMs.Load())) < ttl {
|
||||||
// There is a new value, release lock and return it.
|
// There is a new value, release lock and return it.
|
||||||
v = t.valErr.Load()
|
if v = t.val.Load(); v != nil {
|
||||||
t.updating.Unlock()
|
return *v, nil
|
||||||
return v.v, v.e
|
|
||||||
}
|
}
|
||||||
t.update()
|
}
|
||||||
v = t.valErr.Load()
|
|
||||||
t.updating.Unlock()
|
if err := t.update(ctx); err != nil {
|
||||||
return v.v, v.e
|
var empty T
|
||||||
|
return empty, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return *t.val.Load(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Cache[T]) update() {
|
// Get will return a cached value or fetch a new one.
|
||||||
val, err := t.updateFn()
|
// Tf the Update function returns an error the value is forwarded as is and not cached.
|
||||||
if err != nil {
|
func (t *Cache[T]) Get() (T, error) {
|
||||||
if t.opts.ReturnLastGood {
|
return t.GetWithCtx(context.Background())
|
||||||
// Keep last good value.
|
}
|
||||||
v := t.valErr.Load()
|
|
||||||
if v != nil {
|
func (t *Cache[T]) update(ctx context.Context) error {
|
||||||
val = v.v
|
val, err := t.updateFn(ctx)
|
||||||
}
|
if err != nil {
|
||||||
}
|
if t.opts.ReturnLastGood && t.val.Load() != nil {
|
||||||
}
|
// Keep last good value, so update
|
||||||
t.valErr.Store(&struct {
|
// does not return an error.
|
||||||
v T
|
return nil
|
||||||
e error
|
}
|
||||||
}{v: val, e: err})
|
return err
|
||||||
t.lastUpdateMs.Store(time.Now().UnixMilli())
|
}
|
||||||
|
|
||||||
|
t.val.Store(&val)
|
||||||
|
t.lastUpdateMs.Store(time.Now().UnixMilli())
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -18,15 +18,76 @@
|
|||||||
package cachevalue
|
package cachevalue
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func slowCaller(ctx context.Context) error {
|
||||||
|
sl := time.NewTimer(time.Second)
|
||||||
|
defer sl.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sl.C:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheCtx(t *testing.T) {
|
||||||
|
cache := New[time.Time]()
|
||||||
|
t.Parallel()
|
||||||
|
cache.InitOnce(2*time.Second, Opts{},
|
||||||
|
func(ctx context.Context) (time.Time, error) {
|
||||||
|
return time.Now(), slowCaller(ctx)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel() // cancel context to test.
|
||||||
|
|
||||||
|
_, err := cache.GetWithCtx(ctx)
|
||||||
|
if !errors.Is(err, context.Canceled) {
|
||||||
|
t.Fatalf("expected context.Canceled err, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel = context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
t1, err := cache.GetWithCtx(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected nil err, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t2, err := cache.GetWithCtx(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected nil err, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !t1.Equal(t2) {
|
||||||
|
t.Fatalf("expected time to be equal: %s != %s", t1, t2)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
t3, err := cache.GetWithCtx(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected nil err, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if t1.Equal(t3) {
|
||||||
|
t.Fatalf("expected time to be un-equal: %s == %s", t1, t3)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCache(t *testing.T) {
|
func TestCache(t *testing.T) {
|
||||||
cache := New[time.Time]()
|
cache := New[time.Time]()
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
cache.InitOnce(2*time.Second, Opts{},
|
cache.InitOnce(2*time.Second, Opts{},
|
||||||
func() (time.Time, error) {
|
func(ctx context.Context) (time.Time, error) {
|
||||||
return time.Now(), nil
|
return time.Now(), nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@ -50,7 +111,7 @@ func TestCache(t *testing.T) {
|
|||||||
func BenchmarkCache(b *testing.B) {
|
func BenchmarkCache(b *testing.B) {
|
||||||
cache := New[time.Time]()
|
cache := New[time.Time]()
|
||||||
cache.InitOnce(1*time.Millisecond, Opts{},
|
cache.InitOnce(1*time.Millisecond, Opts{},
|
||||||
func() (time.Time, error) {
|
func(ctx context.Context) (time.Time, error) {
|
||||||
return time.Now(), nil
|
return time.Now(), nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user