mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
Remove locks on usage cache (#16786)
This commit is contained in:
parent
b984bf8d1a
commit
a547bf517d
@ -23,6 +23,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -847,37 +848,54 @@ type objectIO interface {
|
|||||||
|
|
||||||
// load the cache content with name from minioMetaBackgroundOpsBucket.
|
// load the cache content with name from minioMetaBackgroundOpsBucket.
|
||||||
// Only backend errors are returned as errors.
|
// Only backend errors are returned as errors.
|
||||||
|
// The loader is optimistic and has no locking, but tries 5 times before giving up.
|
||||||
// If the object is not found or unable to deserialize d is cleared and nil error is returned.
|
// If the object is not found or unable to deserialize d is cleared and nil error is returned.
|
||||||
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
|
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
|
||||||
// Abandon if more than 5 minutes, so we don't hold up scanner.
|
// Abandon if more than 5 minutes, so we don't hold up scanner.
|
||||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, readLock, ObjectOptions{})
|
// Caches are read+written without locks,
|
||||||
if err != nil {
|
retries := 0
|
||||||
switch err.(type) {
|
for retries < 5 {
|
||||||
case ObjectNotFound:
|
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, noLock, ObjectOptions{NoLock: true})
|
||||||
case BucketNotFound:
|
if err != nil {
|
||||||
case InsufficientReadQuorum:
|
switch err.(type) {
|
||||||
case StorageErr:
|
case ObjectNotFound, BucketNotFound:
|
||||||
default:
|
case InsufficientReadQuorum, StorageErr:
|
||||||
return toObjectErr(err, dataUsageBucket, name)
|
retries++
|
||||||
|
time.Sleep(time.Duration(rand.Int63n(int64(time.Second))))
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
return toObjectErr(err, dataUsageBucket, name)
|
||||||
|
}
|
||||||
|
*d = dataUsageCache{}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
*d = dataUsageCache{}
|
if err := d.deserialize(r); err != nil {
|
||||||
|
r.Close()
|
||||||
|
retries++
|
||||||
|
time.Sleep(time.Duration(rand.Int63n(int64(time.Second))))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
r.Close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
defer r.Close()
|
*d = dataUsageCache{}
|
||||||
if err := d.deserialize(r); err != nil {
|
|
||||||
*d = dataUsageCache{}
|
|
||||||
logger.LogOnceIf(ctx, err, err.Error())
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Maximum running concurrent saves on server.
|
||||||
|
var maxConcurrentScannerSaves = make(chan struct{}, 4)
|
||||||
|
|
||||||
// save the content of the cache to minioMetaBackgroundOpsBucket with the provided name.
|
// save the content of the cache to minioMetaBackgroundOpsBucket with the provided name.
|
||||||
|
// Note that no locking is done when saving.
|
||||||
func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error {
|
func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error {
|
||||||
var r io.Reader
|
var r io.Reader
|
||||||
|
maxConcurrentScannerSaves <- struct{}{}
|
||||||
|
defer func() {
|
||||||
|
<-maxConcurrentScannerSaves
|
||||||
|
}()
|
||||||
// If big, do streaming...
|
// If big, do streaming...
|
||||||
size := int64(-1)
|
size := int64(-1)
|
||||||
if len(d.Cache) > 10000 {
|
if len(d.Cache) > 10000 {
|
||||||
@ -909,7 +927,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
|
|||||||
dataUsageBucket,
|
dataUsageBucket,
|
||||||
name,
|
name,
|
||||||
NewPutObjReader(hr),
|
NewPutObjReader(hr),
|
||||||
ObjectOptions{})
|
ObjectOptions{NoLock: true})
|
||||||
if isErrBucketNotFound(err) {
|
if isErrBucketNotFound(err) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -449,6 +449,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
|
|||||||
// Start one scanner per disk
|
// Start one scanner per disk
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(len(disks))
|
wg.Add(len(disks))
|
||||||
|
|
||||||
for i := range disks {
|
for i := range disks {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
@ -518,7 +519,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
|
|||||||
if r := cache.root(); r != nil {
|
if r := cache.root(); r != nil {
|
||||||
root = cache.flatten(*r)
|
root = cache.flatten(*r)
|
||||||
}
|
}
|
||||||
t := time.Now()
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
@ -528,9 +528,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
|
|||||||
Entry: root,
|
Entry: root,
|
||||||
}:
|
}:
|
||||||
}
|
}
|
||||||
// We want to avoid synchronizing up all writes in case
|
|
||||||
// the results are piled up.
|
|
||||||
time.Sleep(time.Duration(float64(time.Since(t)) * rand.Float64()))
|
|
||||||
// Save cache
|
// Save cache
|
||||||
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
|
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user