mirror of
https://github.com/minio/minio.git
synced 2025-02-23 03:22:30 -05:00
store the cache in-memory instead of disks to avoid large write amplifications for list heavy workloads, store in memory instead and let it auto expire.
244 lines
6.9 KiB
Go
244 lines
6.9 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
)
|
|
|
|
func renameAllBucketMetacache(epPath string) error {
|
|
// Rename all previous `.minio.sys/buckets/<bucketname>/.metacache` to
|
|
// to `.minio.sys/tmp/` for deletion.
|
|
return readDirFn(pathJoin(epPath, minioMetaBucket, bucketMetaPrefix), func(name string, typ os.FileMode) error {
|
|
if typ == os.ModeDir {
|
|
tmpMetacacheOld := pathJoin(epPath, minioMetaTmpBucket+"-old", mustGetUUID())
|
|
if err := renameAll(pathJoin(epPath, minioMetaBucket, metacachePrefixForID(name, slashSeparator)),
|
|
tmpMetacacheOld); err != nil && err != errFileNotFound {
|
|
return fmt.Errorf("unable to rename (%s -> %s) %w",
|
|
pathJoin(epPath, minioMetaBucket+metacachePrefixForID(minioMetaBucket, slashSeparator)),
|
|
tmpMetacacheOld,
|
|
osErrToFileErr(err))
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// listPath will return the requested entries.
|
|
// If no more entries are in the listing io.EOF is returned,
|
|
// otherwise nil or an unexpected error is returned.
|
|
// The listPathOptions given will be checked and modified internally.
|
|
// Required important fields are Bucket, Prefix, Separator.
|
|
// Other important fields are Limit, Marker.
|
|
// List ID always derived from the Marker.
|
|
func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
|
|
if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, z); err != nil {
|
|
return entries, err
|
|
}
|
|
|
|
// Marker is set validate pre-condition.
|
|
if o.Marker != "" && o.Prefix != "" {
|
|
// Marker not common with prefix is not implemented. Send an empty response
|
|
if !HasPrefix(o.Marker, o.Prefix) {
|
|
return entries, io.EOF
|
|
}
|
|
}
|
|
|
|
// With max keys of zero we have reached eof, return right here.
|
|
if o.Limit == 0 {
|
|
return entries, io.EOF
|
|
}
|
|
|
|
// For delimiter and prefix as '/' we do not list anything at all
|
|
// along // with the prefix. On a flat namespace with 'prefix'
|
|
// as '/' we don't have any entries, since all the keys are
|
|
// of form 'keyName/...'
|
|
if strings.HasPrefix(o.Prefix, SlashSeparator) {
|
|
return entries, io.EOF
|
|
}
|
|
|
|
// Over flowing count - reset to maxObjectList.
|
|
if o.Limit < 0 || o.Limit > maxObjectList {
|
|
o.Limit = maxObjectList
|
|
}
|
|
|
|
// If delimiter is slashSeparator we must return directories of
|
|
// the non-recursive scan unless explicitly requested.
|
|
o.IncludeDirectories = o.Separator == slashSeparator
|
|
if (o.Separator == slashSeparator || o.Separator == "") && !o.Recursive {
|
|
o.Recursive = o.Separator != slashSeparator
|
|
o.Separator = slashSeparator
|
|
} else {
|
|
// Default is recursive, if delimiter is set then list non recursive.
|
|
o.Recursive = true
|
|
}
|
|
|
|
// Decode and get the optional list id from the marker.
|
|
o.Marker, o.ID = parseMarker(o.Marker)
|
|
o.Create = o.ID == ""
|
|
if o.ID == "" {
|
|
o.ID = mustGetUUID()
|
|
}
|
|
o.BaseDir = baseDirFromPrefix(o.Prefix)
|
|
if o.discardResult {
|
|
// Override for single object.
|
|
o.BaseDir = o.Prefix
|
|
}
|
|
|
|
// For very small recursive listings, don't same cache.
|
|
// Attempts to avoid expensive listings to run for a long
|
|
// while when clients aren't interested in results.
|
|
// If the client DOES resume the listing a full cache
|
|
// will be generated due to the marker without ID and this check failing.
|
|
if o.Limit < 10 && o.Marker == "" && o.Create && o.Recursive {
|
|
o.discardResult = true
|
|
}
|
|
|
|
var cache metacache
|
|
// If we don't have a list id we must ask the server if it has a cache or create a new.
|
|
if o.Create {
|
|
o.CurrentCycle = intDataUpdateTracker.current()
|
|
o.OldestCycle = globalNotificationSys.findEarliestCleanBloomFilter(ctx, path.Join(o.Bucket, o.BaseDir))
|
|
var cache metacache
|
|
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
|
|
if isReservedOrInvalidBucket(o.Bucket, false) {
|
|
// discard all list caches for reserved buckets.
|
|
o.discardResult = true
|
|
rpc = nil
|
|
}
|
|
// Apply prefix filter if enabled.
|
|
o.SetFilter()
|
|
if rpc == nil {
|
|
// Local
|
|
cache = localMetacacheMgr.findCache(ctx, o)
|
|
} else {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
c, err := rpc.GetMetacacheListing(ctx, o)
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
// Context is canceled, return at once.
|
|
// request canceled, no entries to return
|
|
return entries, io.EOF
|
|
}
|
|
if !errors.Is(err, context.DeadlineExceeded) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
cache = localMetacacheMgr.findCache(ctx, o)
|
|
} else {
|
|
cache = *c
|
|
}
|
|
}
|
|
if cache.fileNotFound {
|
|
// No cache found, no entries found.
|
|
return entries, io.EOF
|
|
}
|
|
// Only create if we created a new.
|
|
o.Create = o.ID == cache.id
|
|
o.ID = cache.id
|
|
}
|
|
|
|
var mu sync.Mutex
|
|
var wg sync.WaitGroup
|
|
var errs []error
|
|
allAtEOF := true
|
|
mu.Lock()
|
|
// Ask all sets and merge entries.
|
|
for _, pool := range z.serverPools {
|
|
for _, set := range pool.sets {
|
|
wg.Add(1)
|
|
go func(i int, set *erasureObjects) {
|
|
defer wg.Done()
|
|
e, err := set.listPath(ctx, o)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if err == nil {
|
|
allAtEOF = false
|
|
}
|
|
errs[i] = err
|
|
entries.merge(e, -1)
|
|
|
|
// Resolve non-trivial conflicts
|
|
entries.deduplicate(func(existing, other *metaCacheEntry) (replace bool) {
|
|
if existing.isDir() {
|
|
return false
|
|
}
|
|
eFIV, err := existing.fileInfo(o.Bucket)
|
|
if err != nil {
|
|
return true
|
|
}
|
|
oFIV, err := existing.fileInfo(o.Bucket)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return oFIV.ModTime.After(eFIV.ModTime)
|
|
})
|
|
if entries.len() > o.Limit {
|
|
allAtEOF = false
|
|
entries.truncate(o.Limit)
|
|
}
|
|
}(len(errs), set)
|
|
errs = append(errs, nil)
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
wg.Wait()
|
|
|
|
if isAllNotFound(errs) {
|
|
// All sets returned not found.
|
|
go func() {
|
|
// Update master cache with that information.
|
|
cache.status = scanStateSuccess
|
|
cache.fileNotFound = true
|
|
o.updateMetacacheListing(cache, globalNotificationSys.restClientFromHash(o.Bucket))
|
|
}()
|
|
// cache returned not found, entries truncated.
|
|
return entries, io.EOF
|
|
}
|
|
|
|
for _, err := range errs {
|
|
if err == nil {
|
|
allAtEOF = false
|
|
continue
|
|
}
|
|
if err.Error() == io.EOF.Error() {
|
|
continue
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
return entries, err
|
|
}
|
|
truncated := entries.len() > o.Limit || !allAtEOF
|
|
entries.truncate(o.Limit)
|
|
if !o.discardResult {
|
|
entries.listID = o.ID
|
|
}
|
|
if !truncated {
|
|
return entries, io.EOF
|
|
}
|
|
return entries, nil
|
|
}
|