mirror of
https://github.com/minio/minio.git
synced 2025-02-03 09:55:59 -05:00
f0819cce75
On extremely long running listings keep the transient list 15 minutes after last update instead of using start time. Also don't do overlap checks on transient lists.
457 lines
12 KiB
Go
457 lines
12 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"runtime/debug"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/klauspost/compress/s2"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/hash"
|
|
"github.com/tinylib/msgp/msgp"
|
|
)
|
|
|
|
//go:generate msgp -file $GOFILE -unexported
|
|
|
|
// a bucketMetacache keeps track of all caches generated
|
|
// for a bucket.
|
|
type bucketMetacache struct {
|
|
// Name of bucket
|
|
bucket string
|
|
|
|
// caches indexed by id.
|
|
caches map[string]metacache
|
|
|
|
// Internal state
|
|
mu sync.RWMutex `msg:"-"`
|
|
updated bool `msg:"-"`
|
|
transient bool `msg:"-"` // bucket used for non-persisted caches.
|
|
}
|
|
|
|
// newBucketMetacache creates a new bucketMetacache.
|
|
// Optionally remove all existing caches.
|
|
func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
|
|
if cleanup {
|
|
// Recursively delete all caches.
|
|
objAPI := newObjectLayerFn()
|
|
ez, ok := objAPI.(*erasureServerSets)
|
|
if ok {
|
|
ctx := context.Background()
|
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
|
|
}
|
|
}
|
|
return &bucketMetacache{
|
|
bucket: bucket,
|
|
caches: make(map[string]metacache, 10),
|
|
}
|
|
}
|
|
|
|
// loadBucketMetaCache will load the cache from the object layer.
|
|
// If the cache cannot be found a new one is created.
|
|
func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) {
|
|
objAPI := newObjectLayerFn()
|
|
for objAPI == nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-time.After(250 * time.Millisecond):
|
|
}
|
|
objAPI = newObjectLayerFn()
|
|
if objAPI == nil {
|
|
logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket))
|
|
}
|
|
}
|
|
var meta bucketMetacache
|
|
var decErr error
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
|
|
r, w := io.Pipe()
|
|
go func() {
|
|
defer wg.Done()
|
|
dec := s2DecPool.Get().(*s2.Reader)
|
|
dec.Reset(r)
|
|
decErr = meta.DecodeMsg(msgp.NewReader(dec))
|
|
dec.Reset(nil)
|
|
s2DecPool.Put(dec)
|
|
r.CloseWithError(decErr)
|
|
}()
|
|
// Use global context for this.
|
|
err := objAPI.GetObject(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), 0, -1, w, "", ObjectOptions{})
|
|
logger.LogIf(ctx, w.CloseWithError(err))
|
|
if err != nil {
|
|
if isErrObjectNotFound(err) {
|
|
err = nil
|
|
} else {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
if errors.Is(err, InsufficientReadQuorum{}) {
|
|
// Cache is likely lost. Clean up and return new.
|
|
return newBucketMetacache(bucket, true), nil
|
|
}
|
|
return newBucketMetacache(bucket, false), err
|
|
}
|
|
wg.Wait()
|
|
if decErr != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return newBucketMetacache(bucket, false), err
|
|
}
|
|
// Log the error, but assume the data is lost and return a fresh bucket.
|
|
// Otherwise a broken cache will never recover.
|
|
logger.LogIf(ctx, decErr)
|
|
return newBucketMetacache(bucket, true), nil
|
|
}
|
|
// Sanity check...
|
|
if meta.bucket != bucket {
|
|
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
|
|
return newBucketMetacache(bucket, true), nil
|
|
}
|
|
return &meta, nil
|
|
}
|
|
|
|
// save the bucket cache to the object storage.
|
|
func (b *bucketMetacache) save(ctx context.Context) error {
|
|
if b.transient {
|
|
return nil
|
|
}
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
// Keep lock while we marshal.
|
|
// We need a write lock since we update 'updated'
|
|
b.mu.Lock()
|
|
if !b.updated {
|
|
b.mu.Unlock()
|
|
return nil
|
|
}
|
|
// Save as s2 compressed msgpack
|
|
tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize()))
|
|
enc := s2.NewWriter(tmp)
|
|
err := msgp.Encode(enc, b)
|
|
if err != nil {
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
err = enc.Close()
|
|
if err != nil {
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
b.updated = false
|
|
b.mu.Unlock()
|
|
|
|
hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len()), false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr, nil, nil), ObjectOptions{})
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
|
|
// findCache will attempt to find a matching cache for the provided options.
|
|
// If a cache with the same ID exists already it will be returned.
|
|
// If none can be found a new is created with the provided ID.
|
|
func (b *bucketMetacache) findCache(o listPathOptions) metacache {
|
|
if b == nil {
|
|
logger.Info("bucketMetacache.findCache: nil cache for bucket %s", o.Bucket)
|
|
return metacache{}
|
|
}
|
|
|
|
if o.Bucket != b.bucket && !b.transient {
|
|
logger.Info("bucketMetacache.findCache: bucket %s does not match this bucket %s", o.Bucket, b.bucket)
|
|
debug.PrintStack()
|
|
return metacache{}
|
|
}
|
|
|
|
debugPrint := func(msg string, data ...interface{}) {}
|
|
if false {
|
|
debugPrint = logger.Info
|
|
}
|
|
|
|
// Grab a write lock, since we create one if we cannot find one.
|
|
if o.Create {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
} else {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
}
|
|
|
|
// Check if exists already.
|
|
if c, ok := b.caches[o.ID]; ok {
|
|
debugPrint("returning existing %v", o.ID)
|
|
return c
|
|
}
|
|
|
|
var best metacache
|
|
for _, cached := range b.caches {
|
|
// Never return transient caches if there is no id.
|
|
if b.transient {
|
|
break
|
|
}
|
|
if cached.status == scanStateError || cached.dataVersion != metacacheStreamVersion {
|
|
debugPrint("cache %s state or stream version mismatch", cached.id)
|
|
continue
|
|
}
|
|
if cached.startedCycle < o.OldestCycle {
|
|
debugPrint("cache %s cycle too old", cached.id)
|
|
continue
|
|
}
|
|
// Root of what we are looking for must at least have
|
|
if !strings.HasPrefix(o.BaseDir, cached.root) {
|
|
debugPrint("cache %s prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir)
|
|
continue
|
|
}
|
|
// If the existing listing wasn't recursive root must match.
|
|
if !cached.recursive && o.BaseDir != cached.root {
|
|
debugPrint("cache %s non rec prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir)
|
|
continue
|
|
}
|
|
if o.Recursive && !cached.recursive {
|
|
debugPrint("cache %s not recursive", cached.id)
|
|
// If this is recursive the cached listing must be as well.
|
|
continue
|
|
}
|
|
if o.Separator != slashSeparator && !cached.recursive {
|
|
debugPrint("cache %s not slashsep and not recursive", cached.id)
|
|
// Non slash separator requires recursive.
|
|
continue
|
|
}
|
|
if cached.ended.IsZero() && time.Since(cached.lastUpdate) > metacacheMaxRunningAge {
|
|
debugPrint("cache %s not running, time: %v", cached.id, time.Since(cached.lastUpdate))
|
|
// Abandoned
|
|
continue
|
|
}
|
|
if !cached.ended.IsZero() && cached.endedCycle <= o.OldestCycle {
|
|
debugPrint("cache %s ended and cycle (%v) <= oldest allowed (%v)", cached.id, cached.endedCycle, o.OldestCycle)
|
|
// If scan has ended the oldest requested must be less.
|
|
continue
|
|
}
|
|
if cached.started.Before(best.started) {
|
|
debugPrint("cache %s disregarded - we have a better", cached.id)
|
|
// If we already have a newer, keep that.
|
|
continue
|
|
}
|
|
best = cached
|
|
}
|
|
if !best.started.IsZero() {
|
|
if o.Create {
|
|
best.lastHandout = UTCNow()
|
|
b.caches[best.id] = best
|
|
b.updated = true
|
|
}
|
|
debugPrint("returning cached %s, status: %v, ended: %v", best.id, best.status, best.ended)
|
|
return best
|
|
}
|
|
if !o.Create {
|
|
return metacache{
|
|
id: o.ID,
|
|
bucket: o.Bucket,
|
|
status: scanStateNone,
|
|
}
|
|
}
|
|
|
|
// Create new and add.
|
|
best = o.newMetacache()
|
|
b.caches[o.ID] = best
|
|
b.updated = true
|
|
debugPrint("returning new cache %s, bucket: %v", best.id, best.bucket)
|
|
return best
|
|
}
|
|
|
|
// cleanup removes redundant and outdated entries.
|
|
func (b *bucketMetacache) cleanup() {
|
|
// Entries to remove.
|
|
remove := make(map[string]struct{})
|
|
currentCycle := intDataUpdateTracker.current()
|
|
|
|
debugPrint := func(msg string, data ...interface{}) {}
|
|
if false {
|
|
debugPrint = logger.Info
|
|
}
|
|
|
|
b.mu.RLock()
|
|
for id, cache := range b.caches {
|
|
if b.transient && time.Since(cache.lastUpdate) > 15*time.Minute && time.Since(cache.lastHandout) > 15*time.Minute {
|
|
// Keep transient caches only for 1 hour.
|
|
remove[id] = struct{}{}
|
|
continue
|
|
}
|
|
if !cache.worthKeeping(currentCycle) {
|
|
debugPrint("cache %s not worth keeping", id)
|
|
remove[id] = struct{}{}
|
|
continue
|
|
}
|
|
if cache.id != id {
|
|
logger.Info("cache ID mismatch %s != %s", id, cache.id)
|
|
remove[id] = struct{}{}
|
|
continue
|
|
}
|
|
if cache.bucket != b.bucket && !b.transient {
|
|
logger.Info("cache bucket mismatch %s != %s", b.bucket, cache.bucket)
|
|
remove[id] = struct{}{}
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Check all non-deleted against eachother.
|
|
// O(n*n), but should still be rather quick.
|
|
for id, cache := range b.caches {
|
|
if b.transient {
|
|
break
|
|
}
|
|
if _, ok := remove[id]; ok {
|
|
continue
|
|
}
|
|
for _, cache2 := range b.caches {
|
|
if cache.canBeReplacedBy(&cache2) {
|
|
debugPrint("cache %s can be replaced by %s", id, cache2.id)
|
|
remove[id] = struct{}{}
|
|
break
|
|
} else {
|
|
debugPrint("cache %s can be NOT replaced by %s", id, cache2.id)
|
|
}
|
|
}
|
|
}
|
|
|
|
b.mu.RUnlock()
|
|
for id := range remove {
|
|
b.deleteCache(id)
|
|
}
|
|
}
|
|
|
|
// updateCache will update a cache by id.
|
|
// If the cache cannot be found nil is returned.
|
|
// The bucket cache will be locked until the done .
|
|
func (b *bucketMetacache) updateCache(id string) (cache *metacache, done func()) {
|
|
b.mu.Lock()
|
|
c, ok := b.caches[id]
|
|
if !ok {
|
|
b.mu.Unlock()
|
|
return nil, func() {}
|
|
}
|
|
return &c, func() {
|
|
c.lastUpdate = UTCNow()
|
|
b.caches[id] = c
|
|
b.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// updateCacheEntry will update a cache.
|
|
// Returns the updated status.
|
|
func (b *bucketMetacache) updateCacheEntry(update metacache) (metacache, error) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
existing, ok := b.caches[update.id]
|
|
if !ok {
|
|
logger.Info("updateCacheEntry: bucket %s list id %v not found", b.bucket, update.id)
|
|
return update, errFileNotFound
|
|
}
|
|
|
|
existing.lastUpdate = UTCNow()
|
|
|
|
if existing.status == scanStateStarted && update.status == scanStateSuccess {
|
|
existing.ended = UTCNow()
|
|
existing.endedCycle = update.endedCycle
|
|
}
|
|
|
|
if existing.status == scanStateStarted && update.status != scanStateStarted {
|
|
existing.status = update.status
|
|
}
|
|
|
|
if existing.error == "" && update.error != "" {
|
|
existing.error = update.error
|
|
existing.status = scanStateError
|
|
existing.ended = UTCNow()
|
|
}
|
|
existing.fileNotFound = existing.fileNotFound || update.fileNotFound
|
|
b.caches[update.id] = existing
|
|
b.updated = true
|
|
return existing, nil
|
|
}
|
|
|
|
// getCache will return a clone of a specific metacache.
|
|
// Will return nil if the cache doesn't exist.
|
|
func (b *bucketMetacache) getCache(id string) *metacache {
|
|
b.mu.RLock()
|
|
c, ok := b.caches[id]
|
|
b.mu.RUnlock()
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return &c
|
|
}
|
|
|
|
// deleteAll will delete all on disk data for ALL caches.
|
|
// Deletes are performed concurrently.
|
|
func (b *bucketMetacache) deleteAll() {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
ctx := context.Background()
|
|
ez, ok := newObjectLayerFn().(*erasureServerSets)
|
|
if !ok {
|
|
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasureZones"))
|
|
return
|
|
}
|
|
|
|
b.updated = true
|
|
if !b.transient {
|
|
// Delete all.
|
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
|
|
b.caches = make(map[string]metacache, 10)
|
|
return
|
|
}
|
|
|
|
// Transient are in different buckets.
|
|
var wg sync.WaitGroup
|
|
for id := range b.caches {
|
|
wg.Add(1)
|
|
go func(cache metacache) {
|
|
defer wg.Done()
|
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
|
|
}(b.caches[id])
|
|
}
|
|
wg.Wait()
|
|
b.caches = make(map[string]metacache, 10)
|
|
}
|
|
|
|
// deleteCache will delete a specific cache and all files related to it across the cluster.
|
|
func (b *bucketMetacache) deleteCache(id string) {
|
|
b.mu.Lock()
|
|
c, ok := b.caches[id]
|
|
if ok {
|
|
delete(b.caches, id)
|
|
b.updated = true
|
|
}
|
|
b.mu.Unlock()
|
|
if ok {
|
|
c.delete(context.Background())
|
|
}
|
|
}
|