mirror of
https://github.com/minio/minio.git
synced 2025-04-17 01:10:29 -04:00
metacache: Add trashcan (#10820)
Add trashcan that keeps recently updated lists after bucket deletion. All caches were deleted once a bucket was deleted, so caches still running would report errors. Now they are canceled. Fix `.minio.sys` not being transient.
This commit is contained in:
parent
8c76e1353e
commit
b9277c8030
@ -204,6 +204,7 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache {
|
|||||||
|
|
||||||
// Check if exists already.
|
// Check if exists already.
|
||||||
if c, ok := b.caches[o.ID]; ok {
|
if c, ok := b.caches[o.ID]; ok {
|
||||||
|
debugPrint("returning existing %v", o.ID)
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -279,6 +280,7 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache {
|
|||||||
best = o.newMetacache()
|
best = o.newMetacache()
|
||||||
b.caches[o.ID] = best
|
b.caches[o.ID] = best
|
||||||
b.updated = true
|
b.updated = true
|
||||||
|
debugPrint("returning new cache %s, bucket: %v", best.id, best.bucket)
|
||||||
return best
|
return best
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -442,17 +444,6 @@ func (b *bucketMetacache) deleteCache(id string) {
|
|||||||
}
|
}
|
||||||
b.mu.Unlock()
|
b.mu.Unlock()
|
||||||
if ok {
|
if ok {
|
||||||
ctx := context.Background()
|
c.delete(context.Background())
|
||||||
objAPI := newObjectLayerFn()
|
|
||||||
if objAPI == nil {
|
|
||||||
logger.LogIf(ctx, errors.New("bucketMetacache: no object layer"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ez, ok := objAPI.(*erasureServerSets)
|
|
||||||
if !ok {
|
|
||||||
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasureServerSets"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(c.bucket, c.id))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,12 +32,14 @@ import (
|
|||||||
// Therefore no cluster locks are required.
|
// Therefore no cluster locks are required.
|
||||||
var localMetacacheMgr = &metacacheManager{
|
var localMetacacheMgr = &metacacheManager{
|
||||||
buckets: make(map[string]*bucketMetacache),
|
buckets: make(map[string]*bucketMetacache),
|
||||||
|
trash: make(map[string]metacache),
|
||||||
}
|
}
|
||||||
|
|
||||||
type metacacheManager struct {
|
type metacacheManager struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
init sync.Once
|
init sync.Once
|
||||||
buckets map[string]*bucketMetacache
|
buckets map[string]*bucketMetacache
|
||||||
|
trash map[string]metacache // Recently deleted lists.
|
||||||
}
|
}
|
||||||
|
|
||||||
const metacacheManagerTransientBucket = "**transient**"
|
const metacacheManagerTransientBucket = "**transient**"
|
||||||
@ -78,11 +80,56 @@ func (m *metacacheManager) initManager() {
|
|||||||
logger.LogIf(bg, v.save(bg))
|
logger.LogIf(bg, v.save(bg))
|
||||||
}
|
}
|
||||||
m.mu.RUnlock()
|
m.mu.RUnlock()
|
||||||
|
m.mu.Lock()
|
||||||
|
for k, v := range m.trash {
|
||||||
|
if time.Since(v.lastUpdate) > metacacheMaxRunningAge {
|
||||||
|
v.delete(context.Background())
|
||||||
|
delete(m.trash, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.mu.Unlock()
|
||||||
}
|
}
|
||||||
m.getTransient().deleteAll()
|
m.getTransient().deleteAll()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// findCache will get a metacache.
|
||||||
|
func (m *metacacheManager) findCache(ctx context.Context, o listPathOptions) metacache {
|
||||||
|
if o.Transient || isReservedOrInvalidBucket(o.Bucket, false) {
|
||||||
|
return m.getTransient().findCache(o)
|
||||||
|
}
|
||||||
|
m.mu.RLock()
|
||||||
|
b, ok := m.buckets[o.Bucket]
|
||||||
|
if ok {
|
||||||
|
m.mu.RUnlock()
|
||||||
|
return b.findCache(o)
|
||||||
|
}
|
||||||
|
if meta, ok := m.trash[o.ID]; ok {
|
||||||
|
m.mu.RUnlock()
|
||||||
|
return meta
|
||||||
|
}
|
||||||
|
m.mu.RUnlock()
|
||||||
|
return m.getBucket(ctx, o.Bucket).findCache(o)
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateCacheEntry will update non-transient state.
|
||||||
|
func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
if meta, ok := m.trash[update.id]; ok {
|
||||||
|
m.mu.RUnlock()
|
||||||
|
return meta, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
b, ok := m.buckets[update.bucket]
|
||||||
|
if ok {
|
||||||
|
m.mu.RUnlock()
|
||||||
|
return b.updateCacheEntry(update)
|
||||||
|
}
|
||||||
|
m.mu.RUnlock()
|
||||||
|
// We should have either a trashed bucket or this
|
||||||
|
return metacache{}, errVolumeNotFound
|
||||||
|
}
|
||||||
|
|
||||||
// getBucket will get a bucket metacache or load it from disk if needed.
|
// getBucket will get a bucket metacache or load it from disk if needed.
|
||||||
func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucketMetacache {
|
func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucketMetacache {
|
||||||
m.init.Do(m.initManager)
|
m.init.Do(m.initManager)
|
||||||
@ -118,6 +165,7 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
|
|||||||
b, err := loadBucketMetaCache(ctx, bucket)
|
b, err := loadBucketMetaCache(ctx, bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
|
logger.LogIf(ctx, err)
|
||||||
return m.getTransient()
|
return m.getTransient()
|
||||||
}
|
}
|
||||||
if b.bucket != bucket {
|
if b.bucket != bucket {
|
||||||
@ -130,22 +178,43 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
|
|||||||
|
|
||||||
// deleteBucketCache will delete the bucket cache if it exists.
|
// deleteBucketCache will delete the bucket cache if it exists.
|
||||||
func (m *metacacheManager) deleteBucketCache(bucket string) {
|
func (m *metacacheManager) deleteBucketCache(bucket string) {
|
||||||
|
m.init.Do(m.initManager)
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
|
||||||
b, ok := m.buckets[bucket]
|
b, ok := m.buckets[bucket]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
m.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
b.deleteAll()
|
|
||||||
delete(m.buckets, bucket)
|
delete(m.buckets, bucket)
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
// Since deletes may take some time we try to do it without
|
||||||
|
// holding lock to m all the time.
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
for k, v := range b.caches {
|
||||||
|
if time.Since(v.lastUpdate) > metacacheMaxRunningAge {
|
||||||
|
v.delete(context.Background())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
v.error = "Bucket deleted"
|
||||||
|
v.status = scanStateError
|
||||||
|
m.mu.Lock()
|
||||||
|
m.trash[k] = v
|
||||||
|
m.mu.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteAll will delete all caches.
|
// deleteAll will delete all caches.
|
||||||
func (m *metacacheManager) deleteAll() {
|
func (m *metacacheManager) deleteAll() {
|
||||||
|
m.init.Do(m.initManager)
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
for _, b := range m.buckets {
|
for bucket, b := range m.buckets {
|
||||||
b.deleteAll()
|
b.deleteAll()
|
||||||
|
if !b.transient {
|
||||||
|
delete(m.buckets, bucket)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -165,9 +234,9 @@ func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTC
|
|||||||
o.Create = false
|
o.Create = false
|
||||||
var cache metacache
|
var cache metacache
|
||||||
if !o.Transient {
|
if !o.Transient {
|
||||||
if rpc == nil {
|
if rpc == nil || o.Transient {
|
||||||
// Local
|
// Local
|
||||||
cache = localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(o)
|
cache = localMetacacheMgr.findCache(ctx, o)
|
||||||
} else {
|
} else {
|
||||||
c, err := rpc.GetMetacacheListing(ctx, o)
|
c, err := rpc.GetMetacacheListing(ctx, o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -175,8 +244,6 @@ func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTC
|
|||||||
}
|
}
|
||||||
cache = *c
|
cache = *c
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
cache = localMetacacheMgr.getTransient().findCache(o)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if cache.status == scanStateNone || cache.fileNotFound {
|
if cache.status == scanStateNone || cache.fileNotFound {
|
||||||
|
@ -91,9 +91,13 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en
|
|||||||
o.OldestCycle = globalNotificationSys.findEarliestCleanBloomFilter(ctx, path.Join(o.Bucket, o.BaseDir))
|
o.OldestCycle = globalNotificationSys.findEarliestCleanBloomFilter(ctx, path.Join(o.Bucket, o.BaseDir))
|
||||||
var cache metacache
|
var cache metacache
|
||||||
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
|
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
|
||||||
if rpc == nil {
|
if isReservedOrInvalidBucket(o.Bucket, false) {
|
||||||
|
rpc = nil
|
||||||
|
o.Transient = true
|
||||||
|
}
|
||||||
|
if rpc == nil || o.Transient {
|
||||||
// Local
|
// Local
|
||||||
cache = localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(o)
|
cache = localMetacacheMgr.findCache(ctx, o)
|
||||||
} else {
|
} else {
|
||||||
c, err := rpc.GetMetacacheListing(ctx, o)
|
c, err := rpc.GetMetacacheListing(ctx, o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -103,8 +107,8 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en
|
|||||||
return entries, io.EOF
|
return entries, io.EOF
|
||||||
}
|
}
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
cache = localMetacacheMgr.getTransient().findCache(o)
|
|
||||||
o.Transient = true
|
o.Transient = true
|
||||||
|
cache = localMetacacheMgr.findCache(ctx, o)
|
||||||
} else {
|
} else {
|
||||||
cache = *c
|
cache = *c
|
||||||
}
|
}
|
||||||
|
@ -255,7 +255,7 @@ func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClien
|
|||||||
return localMetacacheMgr.getTransient().updateCacheEntry(m)
|
return localMetacacheMgr.getTransient().updateCacheEntry(m)
|
||||||
}
|
}
|
||||||
if rpc == nil {
|
if rpc == nil {
|
||||||
return localMetacacheMgr.getBucket(GlobalContext, o.Bucket).updateCacheEntry(m)
|
return localMetacacheMgr.updateCacheEntry(m)
|
||||||
}
|
}
|
||||||
return rpc.UpdateMetacacheListing(context.Background(), m)
|
return rpc.UpdateMetacacheListing(context.Background(), m)
|
||||||
}
|
}
|
||||||
|
@ -17,9 +17,14 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio/minio/cmd/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type scanStatus uint8
|
type scanStatus uint8
|
||||||
@ -132,3 +137,21 @@ func baseDirFromPrefix(prefix string) string {
|
|||||||
}
|
}
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// delete all cache data on disks.
|
||||||
|
func (m *metacache) delete(ctx context.Context) {
|
||||||
|
if m.bucket == "" || m.id == "" {
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("metacache.delete: bucket (%s) or id (%s) empty", m.bucket, m.id))
|
||||||
|
}
|
||||||
|
objAPI := newObjectLayerFn()
|
||||||
|
if objAPI == nil {
|
||||||
|
logger.LogIf(ctx, errors.New("metacache.delete: no object layer"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ez, ok := objAPI.(*erasureServerSets)
|
||||||
|
if !ok {
|
||||||
|
logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerSets"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))
|
||||||
|
}
|
||||||
|
@ -634,13 +634,7 @@ func (s *peerRESTServer) UpdateMetacacheListingHandler(w http.ResponseWriter, r
|
|||||||
s.writeErrorResponse(w, err)
|
s.writeErrorResponse(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
b := localMetacacheMgr.getBucket(ctx, req.bucket)
|
cache, err := localMetacacheMgr.updateCacheEntry(req)
|
||||||
if b == nil {
|
|
||||||
s.writeErrorResponse(w, errServerNotInitialized)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cache, err := b.updateCacheEntry(req)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.writeErrorResponse(w, err)
|
s.writeErrorResponse(w, err)
|
||||||
return
|
return
|
||||||
|
Loading…
x
Reference in New Issue
Block a user