mirror of
https://github.com/minio/minio.git
synced 2025-01-24 05:03:16 -05:00
8bb580abfc
few places were still using legacy call GetObject() which was mainly designed for client response writer, use GetObjectNInfo() for internal calls instead.
485 lines
12 KiB
Go
485 lines
12 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"path"
|
|
"runtime/debug"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/klauspost/compress/s2"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/console"
|
|
"github.com/minio/minio/pkg/hash"
|
|
"github.com/tinylib/msgp/msgp"
|
|
)
|
|
|
|
//go:generate msgp -file $GOFILE -unexported
|
|
|
|
// a bucketMetacache keeps track of all caches generated
|
|
// for a bucket.
|
|
type bucketMetacache struct {
|
|
// Name of bucket
|
|
bucket string
|
|
|
|
// caches indexed by id.
|
|
caches map[string]metacache
|
|
// cache ids indexed by root paths
|
|
cachesRoot map[string][]string `msg:"-"`
|
|
|
|
// Internal state
|
|
mu sync.RWMutex `msg:"-"`
|
|
updated bool `msg:"-"`
|
|
transient bool `msg:"-"` // bucket used for non-persisted caches.
|
|
}
|
|
|
|
// newBucketMetacache creates a new bucketMetacache.
|
|
// Optionally remove all existing caches.
|
|
func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
|
|
if cleanup {
|
|
// Recursively delete all caches.
|
|
objAPI := newObjectLayerFn()
|
|
ez, ok := objAPI.(*erasureServerPools)
|
|
if ok {
|
|
ctx := context.Background()
|
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
|
|
}
|
|
}
|
|
return &bucketMetacache{
|
|
bucket: bucket,
|
|
caches: make(map[string]metacache, 10),
|
|
cachesRoot: make(map[string][]string, 10),
|
|
}
|
|
}
|
|
|
|
func (b *bucketMetacache) debugf(format string, data ...interface{}) {
|
|
if serverDebugLog {
|
|
console.Debugf(format+"\n", data...)
|
|
}
|
|
}
|
|
|
|
// loadBucketMetaCache will load the cache from the object layer.
|
|
// If the cache cannot be found a new one is created.
|
|
func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) {
|
|
objAPI := newObjectLayerFn()
|
|
for objAPI == nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
time.Sleep(250 * time.Millisecond)
|
|
}
|
|
objAPI = newObjectLayerFn()
|
|
if objAPI == nil {
|
|
logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket))
|
|
}
|
|
}
|
|
|
|
var meta bucketMetacache
|
|
var decErr error
|
|
// Use global context for this.
|
|
r, err := objAPI.GetObjectNInfo(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), nil, http.Header{}, readLock, ObjectOptions{})
|
|
if err == nil {
|
|
dec := s2DecPool.Get().(*s2.Reader)
|
|
dec.Reset(r)
|
|
decErr = meta.DecodeMsg(msgp.NewReader(dec))
|
|
dec.Reset(nil)
|
|
r.Close()
|
|
s2DecPool.Put(dec)
|
|
}
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case ObjectNotFound:
|
|
err = nil
|
|
case InsufficientReadQuorum:
|
|
// Cache is likely lost. Clean up and return new.
|
|
return newBucketMetacache(bucket, true), nil
|
|
default:
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
return newBucketMetacache(bucket, false), err
|
|
}
|
|
if decErr != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return newBucketMetacache(bucket, false), err
|
|
}
|
|
// Log the error, but assume the data is lost and return a fresh bucket.
|
|
// Otherwise a broken cache will never recover.
|
|
logger.LogIf(ctx, decErr)
|
|
return newBucketMetacache(bucket, true), nil
|
|
}
|
|
// Sanity check...
|
|
if meta.bucket != bucket {
|
|
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
|
|
return newBucketMetacache(bucket, true), nil
|
|
}
|
|
meta.cachesRoot = make(map[string][]string, len(meta.caches)/10)
|
|
// Index roots
|
|
for id, cache := range meta.caches {
|
|
meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id)
|
|
}
|
|
return &meta, nil
|
|
}
|
|
|
|
// save the bucket cache to the object storage.
|
|
func (b *bucketMetacache) save(ctx context.Context) error {
|
|
if b.transient {
|
|
return nil
|
|
}
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
// Keep lock while we marshal.
|
|
// We need a write lock since we update 'updated'
|
|
b.mu.Lock()
|
|
if !b.updated {
|
|
b.mu.Unlock()
|
|
return nil
|
|
}
|
|
// Save as s2 compressed msgpack
|
|
tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize()))
|
|
enc := s2.NewWriter(tmp)
|
|
err := msgp.Encode(enc, b)
|
|
if err != nil {
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
err = enc.Close()
|
|
if err != nil {
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
b.updated = false
|
|
b.mu.Unlock()
|
|
|
|
hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len()), false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr, nil, nil), ObjectOptions{})
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
|
|
// findCache will attempt to find a matching cache for the provided options.
|
|
// If a cache with the same ID exists already it will be returned.
|
|
// If none can be found a new is created with the provided ID.
|
|
func (b *bucketMetacache) findCache(o listPathOptions) metacache {
|
|
if b == nil {
|
|
logger.Info("bucketMetacache.findCache: nil cache for bucket %s", o.Bucket)
|
|
return metacache{}
|
|
}
|
|
|
|
if o.Bucket != b.bucket && !b.transient {
|
|
logger.Info("bucketMetacache.findCache: bucket %s does not match this bucket %s", o.Bucket, b.bucket)
|
|
debug.PrintStack()
|
|
return metacache{}
|
|
}
|
|
|
|
extend := globalAPIConfig.getExtendListLife()
|
|
|
|
// Grab a write lock, since we create one if we cannot find one.
|
|
if o.Create {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
} else {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
}
|
|
|
|
// Check if exists already.
|
|
if c, ok := b.caches[o.ID]; ok {
|
|
b.debugf("returning existing %v", o.ID)
|
|
return c
|
|
}
|
|
// No need to do expensive checks on transients.
|
|
if b.transient {
|
|
if !o.Create {
|
|
return metacache{
|
|
id: o.ID,
|
|
bucket: o.Bucket,
|
|
status: scanStateNone,
|
|
}
|
|
}
|
|
|
|
// Create new
|
|
best := o.newMetacache()
|
|
b.caches[o.ID] = best
|
|
b.updated = true
|
|
b.debugf("returning new cache %s, bucket: %v", best.id, best.bucket)
|
|
return best
|
|
}
|
|
|
|
var best metacache
|
|
rootSplit := strings.Split(o.BaseDir, slashSeparator)
|
|
for i := range rootSplit {
|
|
interesting := b.cachesRoot[path.Join(rootSplit[:i+1]...)]
|
|
|
|
for _, id := range interesting {
|
|
cached, ok := b.caches[id]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if !cached.matches(&o, extend) {
|
|
continue
|
|
}
|
|
if cached.started.Before(best.started) {
|
|
b.debugf("cache %s disregarded - we have a better", cached.id)
|
|
// If we already have a newer, keep that.
|
|
continue
|
|
}
|
|
best = cached
|
|
}
|
|
}
|
|
if !best.started.IsZero() {
|
|
if o.Create {
|
|
best.lastHandout = UTCNow()
|
|
b.caches[best.id] = best
|
|
b.updated = true
|
|
}
|
|
b.debugf("returning cached %s, status: %v, ended: %v", best.id, best.status, best.ended)
|
|
return best
|
|
}
|
|
if !o.Create {
|
|
return metacache{
|
|
id: o.ID,
|
|
bucket: o.Bucket,
|
|
status: scanStateNone,
|
|
}
|
|
}
|
|
|
|
// Create new and add.
|
|
best = o.newMetacache()
|
|
b.caches[o.ID] = best
|
|
b.cachesRoot[best.root] = append(b.cachesRoot[best.root], best.id)
|
|
b.updated = true
|
|
b.debugf("returning new cache %s, bucket: %v", best.id, best.bucket)
|
|
return best
|
|
}
|
|
|
|
// cleanup removes redundant and outdated entries.
|
|
func (b *bucketMetacache) cleanup() {
|
|
// Entries to remove.
|
|
remove := make(map[string]struct{})
|
|
currentCycle := intDataUpdateTracker.current()
|
|
|
|
// Test on a copy
|
|
// cleanup is the only one deleting caches.
|
|
caches, rootIdx := b.cloneCaches()
|
|
|
|
for id, cache := range caches {
|
|
if b.transient && time.Since(cache.lastUpdate) > 15*time.Minute && time.Since(cache.lastHandout) > 15*time.Minute {
|
|
// Keep transient caches only for 15 minutes.
|
|
remove[id] = struct{}{}
|
|
continue
|
|
}
|
|
if !cache.worthKeeping(currentCycle) {
|
|
b.debugf("cache %s not worth keeping", id)
|
|
remove[id] = struct{}{}
|
|
continue
|
|
}
|
|
if cache.id != id {
|
|
logger.Info("cache ID mismatch %s != %s", id, cache.id)
|
|
remove[id] = struct{}{}
|
|
continue
|
|
}
|
|
if cache.bucket != b.bucket && !b.transient {
|
|
logger.Info("cache bucket mismatch %s != %s", b.bucket, cache.bucket)
|
|
remove[id] = struct{}{}
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Check all non-deleted against eachother.
|
|
// O(n*n), but should still be rather quick.
|
|
for id, cache := range caches {
|
|
if b.transient {
|
|
break
|
|
}
|
|
if _, ok := remove[id]; ok {
|
|
continue
|
|
}
|
|
|
|
interesting := interestingCaches(cache.root, rootIdx)
|
|
for _, id2 := range interesting {
|
|
if _, ok := remove[id2]; ok || id2 == id {
|
|
// Don't check against one we are already removing
|
|
continue
|
|
}
|
|
cache2, ok := caches[id2]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if cache.canBeReplacedBy(&cache2) {
|
|
b.debugf("cache %s can be replaced by %s", id, cache2.id)
|
|
remove[id] = struct{}{}
|
|
break
|
|
} else {
|
|
b.debugf("cache %s can be NOT replaced by %s", id, cache2.id)
|
|
}
|
|
}
|
|
}
|
|
|
|
for id := range remove {
|
|
b.deleteCache(id)
|
|
}
|
|
}
|
|
|
|
// Potentially interesting caches.
|
|
// Will only add root if request is for root.
|
|
func interestingCaches(root string, cachesRoot map[string][]string) []string {
|
|
var interesting []string
|
|
rootSplit := strings.Split(root, slashSeparator)
|
|
for i := range rootSplit {
|
|
want := path.Join(rootSplit[:i+1]...)
|
|
interesting = append(interesting, cachesRoot[want]...)
|
|
}
|
|
return interesting
|
|
}
|
|
|
|
// updateCache will update a cache by id.
|
|
// If the cache cannot be found nil is returned.
|
|
// The bucket cache will be locked until the done .
|
|
func (b *bucketMetacache) updateCache(id string) (cache *metacache, done func()) {
|
|
b.mu.Lock()
|
|
c, ok := b.caches[id]
|
|
if !ok {
|
|
b.mu.Unlock()
|
|
return nil, func() {}
|
|
}
|
|
return &c, func() {
|
|
c.lastUpdate = UTCNow()
|
|
b.caches[id] = c
|
|
b.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// updateCacheEntry will update a cache.
|
|
// Returns the updated status.
|
|
func (b *bucketMetacache) updateCacheEntry(update metacache) (metacache, error) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
existing, ok := b.caches[update.id]
|
|
if !ok {
|
|
logger.Info("updateCacheEntry: bucket %s list id %v not found", b.bucket, update.id)
|
|
return update, errFileNotFound
|
|
}
|
|
existing.update(update)
|
|
b.caches[update.id] = existing
|
|
b.updated = true
|
|
return existing, nil
|
|
}
|
|
|
|
// cloneCaches will return a clone of all current caches.
|
|
func (b *bucketMetacache) cloneCaches() (map[string]metacache, map[string][]string) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
dst := make(map[string]metacache, len(b.caches))
|
|
for k, v := range b.caches {
|
|
dst[k] = v
|
|
}
|
|
// Copy indexes
|
|
dst2 := make(map[string][]string, len(b.cachesRoot))
|
|
for k, v := range b.cachesRoot {
|
|
tmp := make([]string, len(v))
|
|
copy(tmp, v)
|
|
dst2[k] = tmp
|
|
}
|
|
|
|
return dst, dst2
|
|
}
|
|
|
|
// getCache will return a clone of a specific metacache.
|
|
// Will return nil if the cache doesn't exist.
|
|
func (b *bucketMetacache) getCache(id string) *metacache {
|
|
b.mu.RLock()
|
|
c, ok := b.caches[id]
|
|
b.mu.RUnlock()
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return &c
|
|
}
|
|
|
|
// deleteAll will delete all on disk data for ALL caches.
|
|
// Deletes are performed concurrently.
|
|
func (b *bucketMetacache) deleteAll() {
|
|
ctx := context.Background()
|
|
ez, ok := newObjectLayerFn().(*erasureServerPools)
|
|
if !ok {
|
|
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasurePools"))
|
|
return
|
|
}
|
|
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
b.updated = true
|
|
if !b.transient {
|
|
// Delete all.
|
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
|
|
b.caches = make(map[string]metacache, 10)
|
|
b.cachesRoot = make(map[string][]string, 10)
|
|
return
|
|
}
|
|
|
|
// Transient are in different buckets.
|
|
var wg sync.WaitGroup
|
|
for id := range b.caches {
|
|
wg.Add(1)
|
|
go func(cache metacache) {
|
|
defer wg.Done()
|
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
|
|
}(b.caches[id])
|
|
}
|
|
wg.Wait()
|
|
b.caches = make(map[string]metacache, 10)
|
|
}
|
|
|
|
// deleteCache will delete a specific cache and all files related to it across the cluster.
|
|
func (b *bucketMetacache) deleteCache(id string) {
|
|
b.mu.Lock()
|
|
c, ok := b.caches[id]
|
|
if ok {
|
|
// Delete from root map.
|
|
list := b.cachesRoot[c.root]
|
|
for i, lid := range list {
|
|
if id == lid {
|
|
list = append(list[:i], list[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
b.cachesRoot[c.root] = list
|
|
delete(b.caches, id)
|
|
b.updated = true
|
|
}
|
|
b.mu.Unlock()
|
|
if ok {
|
|
c.delete(context.Background())
|
|
}
|
|
}
|