mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
b3c56b53fb
To avoid large delays in metacache cleanup, use rename instead of recursive delete calls, renames are cheaper move the content to minioMetaTmpBucket and then cleanup this folder once in 24hrs instead. If the new cache can replace an existing one, we should let it replace since that is currently being saved anyways, this avoids pile up of 1000's of metacache entires for same listing calls that are not necessary to be stored on disk.
271 lines
6.6 KiB
Go
271 lines
6.6 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
)
|
|
|
|
// localMetacacheMgr is the *local* manager for this peer.
|
|
// It should never be used directly since buckets are
|
|
// distributed deterministically.
|
|
// Therefore no cluster locks are required.
|
|
var localMetacacheMgr = &metacacheManager{
|
|
buckets: make(map[string]*bucketMetacache),
|
|
trash: make(map[string]metacache),
|
|
}
|
|
|
|
type metacacheManager struct {
|
|
mu sync.RWMutex
|
|
init sync.Once
|
|
buckets map[string]*bucketMetacache
|
|
trash map[string]metacache // Recently deleted lists.
|
|
}
|
|
|
|
const metacacheManagerTransientBucket = "**transient**"
|
|
const metacacheMaxEntries = 5000
|
|
|
|
// initManager will start async saving the cache.
|
|
func (m *metacacheManager) initManager() {
|
|
// Add a transient bucket.
|
|
tb := newBucketMetacache(metacacheManagerTransientBucket, false)
|
|
tb.transient = true
|
|
m.buckets[metacacheManagerTransientBucket] = tb
|
|
|
|
// Start saver when object layer is ready.
|
|
go func() {
|
|
objAPI := newObjectLayerFn()
|
|
for objAPI == nil {
|
|
time.Sleep(time.Second)
|
|
objAPI = newObjectLayerFn()
|
|
}
|
|
if !globalIsErasure {
|
|
logger.Info("metacacheManager was initialized in non-erasure mode, skipping save")
|
|
return
|
|
}
|
|
|
|
t := time.NewTicker(time.Minute)
|
|
defer t.Stop()
|
|
|
|
var exit bool
|
|
bg := context.Background()
|
|
for !exit {
|
|
select {
|
|
case <-t.C:
|
|
case <-GlobalContext.Done():
|
|
exit = true
|
|
}
|
|
m.mu.RLock()
|
|
for _, v := range m.buckets {
|
|
if !exit {
|
|
v.cleanup()
|
|
}
|
|
logger.LogIf(bg, v.save(bg))
|
|
}
|
|
m.mu.RUnlock()
|
|
m.mu.Lock()
|
|
for k, v := range m.trash {
|
|
if time.Since(v.lastUpdate) > metacacheMaxRunningAge {
|
|
v.delete(context.Background())
|
|
delete(m.trash, k)
|
|
}
|
|
}
|
|
m.mu.Unlock()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// findCache will get a metacache.
|
|
func (m *metacacheManager) findCache(ctx context.Context, o listPathOptions) metacache {
|
|
if o.Transient || isReservedOrInvalidBucket(o.Bucket, false) {
|
|
return m.getTransient().findCache(o)
|
|
}
|
|
m.mu.RLock()
|
|
b, ok := m.buckets[o.Bucket]
|
|
if ok {
|
|
m.mu.RUnlock()
|
|
return b.findCache(o)
|
|
}
|
|
if meta, ok := m.trash[o.ID]; ok {
|
|
m.mu.RUnlock()
|
|
return meta
|
|
}
|
|
m.mu.RUnlock()
|
|
return m.getBucket(ctx, o.Bucket).findCache(o)
|
|
}
|
|
|
|
// updateCacheEntry will update non-transient state.
|
|
func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error) {
|
|
m.mu.RLock()
|
|
if meta, ok := m.trash[update.id]; ok {
|
|
m.mu.RUnlock()
|
|
return meta, nil
|
|
}
|
|
|
|
b, ok := m.buckets[update.bucket]
|
|
m.mu.RUnlock()
|
|
if ok {
|
|
return b.updateCacheEntry(update)
|
|
}
|
|
|
|
// We should have either a trashed bucket or this
|
|
return metacache{}, errVolumeNotFound
|
|
}
|
|
|
|
// getBucket will get a bucket metacache or load it from disk if needed.
|
|
func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucketMetacache {
|
|
m.init.Do(m.initManager)
|
|
|
|
// Return a transient bucket for invalid or system buckets.
|
|
if isReservedOrInvalidBucket(bucket, false) {
|
|
return m.getTransient()
|
|
}
|
|
m.mu.RLock()
|
|
b, ok := m.buckets[bucket]
|
|
m.mu.RUnlock()
|
|
if ok {
|
|
if b.bucket != bucket {
|
|
logger.Info("getBucket: cached bucket %s does not match this bucket %s", b.bucket, bucket)
|
|
debug.PrintStack()
|
|
}
|
|
return b
|
|
}
|
|
|
|
m.mu.Lock()
|
|
// See if someone else fetched it while we waited for the lock.
|
|
b, ok = m.buckets[bucket]
|
|
if ok {
|
|
m.mu.Unlock()
|
|
if b.bucket != bucket {
|
|
logger.Info("getBucket: newly cached bucket %s does not match this bucket %s", b.bucket, bucket)
|
|
debug.PrintStack()
|
|
}
|
|
return b
|
|
}
|
|
|
|
// Load bucket. If we fail return the transient bucket.
|
|
b, err := loadBucketMetaCache(ctx, bucket)
|
|
if err != nil {
|
|
m.mu.Unlock()
|
|
logger.LogIf(ctx, err)
|
|
return m.getTransient()
|
|
}
|
|
if b.bucket != bucket {
|
|
logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket))
|
|
}
|
|
m.buckets[bucket] = b
|
|
m.mu.Unlock()
|
|
return b
|
|
}
|
|
|
|
// deleteBucketCache will delete the bucket cache if it exists.
|
|
func (m *metacacheManager) deleteBucketCache(bucket string) {
|
|
m.init.Do(m.initManager)
|
|
m.mu.Lock()
|
|
b, ok := m.buckets[bucket]
|
|
if !ok {
|
|
m.mu.Unlock()
|
|
return
|
|
}
|
|
delete(m.buckets, bucket)
|
|
m.mu.Unlock()
|
|
|
|
// Since deletes may take some time we try to do it without
|
|
// holding lock to m all the time.
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
for k, v := range b.caches {
|
|
if time.Since(v.lastUpdate) > metacacheMaxRunningAge {
|
|
v.delete(context.Background())
|
|
continue
|
|
}
|
|
v.error = "Bucket deleted"
|
|
v.status = scanStateError
|
|
m.mu.Lock()
|
|
m.trash[k] = v
|
|
m.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// deleteAll will delete all caches.
|
|
func (m *metacacheManager) deleteAll() {
|
|
m.init.Do(m.initManager)
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
for bucket, b := range m.buckets {
|
|
b.deleteAll()
|
|
if !b.transient {
|
|
delete(m.buckets, bucket)
|
|
}
|
|
}
|
|
}
|
|
|
|
// getTransient will return a transient bucket.
|
|
func (m *metacacheManager) getTransient() *bucketMetacache {
|
|
m.init.Do(m.initManager)
|
|
m.mu.RLock()
|
|
bmc := m.buckets[metacacheManagerTransientBucket]
|
|
m.mu.RUnlock()
|
|
return bmc
|
|
}
|
|
|
|
// checkMetacacheState should be used if data is not updating.
|
|
// Should only be called if a failure occurred.
|
|
func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTClient) error {
|
|
// We operate on a copy...
|
|
o.Create = false
|
|
var cache metacache
|
|
if rpc == nil || o.Transient {
|
|
cache = localMetacacheMgr.findCache(ctx, o)
|
|
} else {
|
|
c, err := rpc.GetMetacacheListing(ctx, o)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cache = *c
|
|
}
|
|
|
|
if cache.status == scanStateNone || cache.fileNotFound {
|
|
return errFileNotFound
|
|
}
|
|
if cache.status == scanStateSuccess || cache.status == scanStateStarted {
|
|
if time.Since(cache.lastUpdate) > metacacheMaxRunningAge {
|
|
// We got a stale entry, mark error on handling server.
|
|
err := fmt.Errorf("timeout: list %s not updated", cache.id)
|
|
cache.error = err.Error()
|
|
cache.status = scanStateError
|
|
if rpc == nil || o.Transient {
|
|
localMetacacheMgr.updateCacheEntry(cache)
|
|
} else {
|
|
rpc.UpdateMetacacheListing(ctx, cache)
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
if cache.error != "" {
|
|
return fmt.Errorf("async cache listing failed with: %s", cache.error)
|
|
}
|
|
return nil
|
|
}
|