mirror of
https://github.com/minio/minio.git
synced 2024-12-25 22:55:54 -05:00
b3c56b53fb
To avoid large delays in metacache cleanup, use rename instead of recursive delete calls, renames are cheaper move the content to minioMetaTmpBucket and then cleanup this folder once in 24hrs instead. If the new cache can replace an existing one, we should let it replace since that is currently being saved anyways, this avoids pile up of 1000's of metacache entires for same listing calls that are not necessary to be stored on disk.
245 lines
6.9 KiB
Go
245 lines
6.9 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
)
|
|
|
|
func renameAllBucketMetacache(epPath string) error {
|
|
// Rename all previous `.minio.sys/buckets/<bucketname>/.metacache` to
|
|
// to `.minio.sys/tmp/` for deletion.
|
|
return readDirFilterFn(pathJoin(epPath, minioMetaBucket, bucketMetaPrefix), func(name string, typ os.FileMode) error {
|
|
if typ == os.ModeDir {
|
|
tmpMetacacheOld := pathJoin(epPath, minioMetaTmpBucket+"-old", mustGetUUID())
|
|
if err := renameAll(pathJoin(epPath, minioMetaBucket, metacachePrefixForID(name, slashSeparator)),
|
|
tmpMetacacheOld); err != nil && err != errFileNotFound {
|
|
return fmt.Errorf("unable to rename (%s -> %s) %w",
|
|
pathJoin(epPath, minioMetaBucket+metacachePrefixForID(minioMetaBucket, slashSeparator)),
|
|
tmpMetacacheOld,
|
|
osErrToFileErr(err))
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// listPath will return the requested entries.
|
|
// If no more entries are in the listing io.EOF is returned,
|
|
// otherwise nil or an unexpected error is returned.
|
|
// The listPathOptions given will be checked and modified internally.
|
|
// Required important fields are Bucket, Prefix, Separator.
|
|
// Other important fields are Limit, Marker.
|
|
// List ID always derived from the Marker.
|
|
func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
|
|
if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, z); err != nil {
|
|
return entries, err
|
|
}
|
|
|
|
// Marker is set validate pre-condition.
|
|
if o.Marker != "" && o.Prefix != "" {
|
|
// Marker not common with prefix is not implemented. Send an empty response
|
|
if !HasPrefix(o.Marker, o.Prefix) {
|
|
return entries, io.EOF
|
|
}
|
|
}
|
|
|
|
// With max keys of zero we have reached eof, return right here.
|
|
if o.Limit == 0 {
|
|
return entries, io.EOF
|
|
}
|
|
|
|
// For delimiter and prefix as '/' we do not list anything at all
|
|
// along // with the prefix. On a flat namespace with 'prefix'
|
|
// as '/' we don't have any entries, since all the keys are
|
|
// of form 'keyName/...'
|
|
if strings.HasPrefix(o.Prefix, SlashSeparator) {
|
|
return entries, io.EOF
|
|
}
|
|
|
|
// Over flowing count - reset to maxObjectList.
|
|
if o.Limit < 0 || o.Limit > maxObjectList {
|
|
o.Limit = maxObjectList
|
|
}
|
|
|
|
// If delimiter is slashSeparator we must return directories of
|
|
// the non-recursive scan unless explicitly requested.
|
|
o.IncludeDirectories = o.Separator == slashSeparator
|
|
if (o.Separator == slashSeparator || o.Separator == "") && !o.Recursive {
|
|
o.Recursive = o.Separator != slashSeparator
|
|
o.Separator = slashSeparator
|
|
} else {
|
|
// Default is recursive, if delimiter is set then list non recursive.
|
|
o.Recursive = true
|
|
}
|
|
|
|
// Decode and get the optional list id from the marker.
|
|
o.Marker, o.ID = parseMarker(o.Marker)
|
|
o.Create = o.ID == ""
|
|
if o.ID == "" {
|
|
o.ID = mustGetUUID()
|
|
}
|
|
o.BaseDir = baseDirFromPrefix(o.Prefix)
|
|
if o.discardResult {
|
|
// Override for single object.
|
|
o.BaseDir = o.Prefix
|
|
}
|
|
|
|
// For very small recursive listings, don't same cache.
|
|
// Attempts to avoid expensive listings to run for a long
|
|
// while when clients aren't interested in results.
|
|
// If the client DOES resume the listing a full cache
|
|
// will be generated due to the marker without ID and this check failing.
|
|
if o.Limit < 10 && o.Marker == "" && o.Create && o.Recursive {
|
|
o.discardResult = true
|
|
o.Transient = true
|
|
}
|
|
|
|
var cache metacache
|
|
// If we don't have a list id we must ask the server if it has a cache or create a new.
|
|
if o.Create {
|
|
o.CurrentCycle = intDataUpdateTracker.current()
|
|
o.OldestCycle = globalNotificationSys.findEarliestCleanBloomFilter(ctx, path.Join(o.Bucket, o.BaseDir))
|
|
var cache metacache
|
|
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
|
|
if isReservedOrInvalidBucket(o.Bucket, false) {
|
|
rpc = nil
|
|
o.Transient = true
|
|
}
|
|
// Apply prefix filter if enabled.
|
|
o.SetFilter()
|
|
if rpc == nil || o.Transient {
|
|
// Local
|
|
cache = localMetacacheMgr.findCache(ctx, o)
|
|
} else {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
c, err := rpc.GetMetacacheListing(ctx, o)
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
// Context is canceled, return at once.
|
|
// request canceled, no entries to return
|
|
return entries, io.EOF
|
|
}
|
|
if !errors.Is(err, context.DeadlineExceeded) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
o.Transient = true
|
|
cache = localMetacacheMgr.findCache(ctx, o)
|
|
} else {
|
|
cache = *c
|
|
}
|
|
}
|
|
if cache.fileNotFound {
|
|
// No cache found, no entries found.
|
|
return entries, io.EOF
|
|
}
|
|
// Only create if we created a new.
|
|
o.Create = o.ID == cache.id
|
|
o.ID = cache.id
|
|
}
|
|
|
|
var mu sync.Mutex
|
|
var wg sync.WaitGroup
|
|
var errs []error
|
|
allAtEOF := true
|
|
mu.Lock()
|
|
// Ask all sets and merge entries.
|
|
for _, pool := range z.serverPools {
|
|
for _, set := range pool.sets {
|
|
wg.Add(1)
|
|
go func(i int, set *erasureObjects) {
|
|
defer wg.Done()
|
|
e, err := set.listPath(ctx, o)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if err == nil {
|
|
allAtEOF = false
|
|
}
|
|
errs[i] = err
|
|
entries.merge(e, -1)
|
|
|
|
// Resolve non-trivial conflicts
|
|
entries.deduplicate(func(existing, other *metaCacheEntry) (replace bool) {
|
|
if existing.isDir() {
|
|
return false
|
|
}
|
|
eFIV, err := existing.fileInfo(o.Bucket)
|
|
if err != nil {
|
|
return true
|
|
}
|
|
oFIV, err := existing.fileInfo(o.Bucket)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return oFIV.ModTime.After(eFIV.ModTime)
|
|
})
|
|
if entries.len() > o.Limit {
|
|
allAtEOF = false
|
|
entries.truncate(o.Limit)
|
|
}
|
|
}(len(errs), set)
|
|
errs = append(errs, nil)
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
wg.Wait()
|
|
|
|
if isAllNotFound(errs) {
|
|
// All sets returned not found.
|
|
go func() {
|
|
// Update master cache with that information.
|
|
cache.status = scanStateSuccess
|
|
cache.fileNotFound = true
|
|
o.updateMetacacheListing(cache, globalNotificationSys.restClientFromHash(o.Bucket))
|
|
}()
|
|
// cache returned not found, entries truncated.
|
|
return entries, io.EOF
|
|
}
|
|
|
|
for _, err := range errs {
|
|
if err == nil {
|
|
allAtEOF = false
|
|
continue
|
|
}
|
|
if err.Error() == io.EOF.Error() {
|
|
continue
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
return entries, err
|
|
}
|
|
truncated := entries.len() > o.Limit || !allAtEOF
|
|
entries.truncate(o.Limit)
|
|
if !o.discardResult {
|
|
entries.listID = o.ID
|
|
}
|
|
if !truncated {
|
|
return entries, io.EOF
|
|
}
|
|
return entries, nil
|
|
}
|