minio/cmd/metacache-server-pool.go
Klaus Post f706671568
fix: multi-way list entry resolution (#12617)
Fixes brought forward from https://github.com/minio/minio/pull/12605

Fixes resolution when an object is in prefix of another and one zone returns the directory and another the object.

Fixes resolution on single entries that arrive first, so resolution doesn't depend on order.
2021-07-02 09:54:00 -07:00

251 lines
7.3 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
pathutil "path"
"strings"
"sync"
"time"
"github.com/minio/minio/internal/logger"
)
func renameAllBucketMetacache(epPath string) error {
// Rename all previous `.minio.sys/buckets/<bucketname>/.metacache` to
// to `.minio.sys/tmp/` for deletion.
return readDirFn(pathJoin(epPath, minioMetaBucket, bucketMetaPrefix), func(name string, typ os.FileMode) error {
if typ == os.ModeDir {
tmpMetacacheOld := pathutil.Join(epPath, minioMetaTmpDeletedBucket, mustGetUUID())
if err := renameAll(pathJoin(epPath, minioMetaBucket, metacachePrefixForID(name, slashSeparator)),
tmpMetacacheOld); err != nil && err != errFileNotFound {
return fmt.Errorf("unable to rename (%s -> %s) %w",
pathJoin(epPath, minioMetaBucket+metacachePrefixForID(minioMetaBucket, slashSeparator)),
tmpMetacacheOld,
osErrToFileErr(err))
}
}
return nil
})
}
// listPath will return the requested entries.
// If no more entries are in the listing io.EOF is returned,
// otherwise nil or an unexpected error is returned.
// The listPathOptions given will be checked and modified internally.
// Required important fields are Bucket, Prefix, Separator.
// Other important fields are Limit, Marker.
// List ID always derived from the Marker.
func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, z); err != nil {
return entries, err
}
// Marker is set validate pre-condition.
if o.Marker != "" && o.Prefix != "" {
// Marker not common with prefix is not implemented. Send an empty response
if !HasPrefix(o.Marker, o.Prefix) {
return entries, io.EOF
}
}
// With max keys of zero we have reached eof, return right here.
if o.Limit == 0 {
return entries, io.EOF
}
// For delimiter and prefix as '/' we do not list anything at all
// along // with the prefix. On a flat namespace with 'prefix'
// as '/' we don't have any entries, since all the keys are
// of form 'keyName/...'
if strings.HasPrefix(o.Prefix, SlashSeparator) {
return entries, io.EOF
}
// If delimiter is slashSeparator we must return directories of
// the non-recursive scan unless explicitly requested.
o.IncludeDirectories = o.Separator == slashSeparator
if (o.Separator == slashSeparator || o.Separator == "") && !o.Recursive {
o.Recursive = o.Separator != slashSeparator
o.Separator = slashSeparator
} else {
// Default is recursive, if delimiter is set then list non recursive.
o.Recursive = true
}
// Decode and get the optional list id from the marker.
o.Marker, o.ID = parseMarker(o.Marker)
o.Create = o.ID == ""
if o.ID == "" {
o.ID = mustGetUUID()
}
o.BaseDir = baseDirFromPrefix(o.Prefix)
if o.discardResult {
// Override for single object.
o.BaseDir = o.Prefix
}
// For very small recursive listings, don't same cache.
// Attempts to avoid expensive listings to run for a long
// while when clients aren't interested in results.
// If the client DOES resume the listing a full cache
// will be generated due to the marker without ID and this check failing.
if o.Limit < 10 && o.Marker == "" && o.Create && o.Recursive {
o.discardResult = true
o.Transient = true
}
var cache metacache
// If we don't have a list id we must ask the server if it has a cache or create a new.
if o.Create {
o.CurrentCycle = intDataUpdateTracker.current()
o.OldestCycle = globalNotificationSys.findEarliestCleanBloomFilter(ctx, path.Join(o.Bucket, o.BaseDir))
var cache metacache
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
if isReservedOrInvalidBucket(o.Bucket, false) {
rpc = nil
o.Transient = true
}
// Apply prefix filter if enabled.
o.SetFilter()
if rpc == nil || o.Transient {
// Local
cache = localMetacacheMgr.findCache(ctx, o)
} else {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
c, err := rpc.GetMetacacheListing(ctx, o)
if err != nil {
if errors.Is(err, context.Canceled) {
// Context is canceled, return at once.
// request canceled, no entries to return
return entries, io.EOF
}
if !errors.Is(err, context.DeadlineExceeded) {
logger.LogIf(ctx, err)
}
o.Transient = true
cache = localMetacacheMgr.findCache(ctx, o)
} else {
cache = *c
}
}
if cache.fileNotFound {
// No cache found, no entries found.
return entries, io.EOF
}
// Only create if we created a new.
o.Create = o.ID == cache.id
o.ID = cache.id
}
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
allAtEOF := true
mu.Lock()
// Ask all sets and merge entries.
for _, pool := range z.serverPools {
for _, set := range pool.sets {
wg.Add(1)
go func(i int, set *erasureObjects) {
defer wg.Done()
e, err := set.listPath(ctx, o)
mu.Lock()
defer mu.Unlock()
if err == nil {
allAtEOF = false
}
errs[i] = err
entries.merge(e, -1)
// Resolve non-trivial conflicts
entries.deduplicate(func(existing, other *metaCacheEntry) (replace bool) {
// Pick object over directory
if existing.isDir() && !other.isDir() {
return true
}
if !existing.isDir() && other.isDir() {
return false
}
eFIV, err := existing.fileInfo(o.Bucket)
if err != nil {
return true
}
oFIV, err := other.fileInfo(o.Bucket)
if err != nil {
return false
}
// Replace if modtime is newer
if !oFIV.ModTime.Equal(eFIV.ModTime) {
return oFIV.ModTime.After(eFIV.ModTime)
}
// Use NumVersions as a final tiebreaker.
return oFIV.NumVersions > eFIV.NumVersions
})
if entries.len() > o.Limit {
allAtEOF = false
entries.truncate(o.Limit)
}
}(len(errs), set)
errs = append(errs, nil)
}
}
mu.Unlock()
wg.Wait()
if isAllNotFound(errs) {
// All sets returned not found.
go func() {
// Update master cache with that information.
cache.status = scanStateSuccess
cache.fileNotFound = true
o.updateMetacacheListing(cache, globalNotificationSys.restClientFromHash(o.Bucket))
}()
// cache returned not found, entries truncated.
return entries, io.EOF
}
for _, err := range errs {
if err == nil {
allAtEOF = false
continue
}
if err.Error() == io.EOF.Error() {
continue
}
logger.LogIf(ctx, err)
return entries, err
}
truncated := entries.len() > o.Limit || !allAtEOF
entries.truncate(o.Limit)
if !o.discardResult {
entries.listID = o.ID
}
if !truncated {
return entries, io.EOF
}
return entries, nil
}