diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 0041ea11a..616e3c01c 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -340,7 +340,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq } } - if serverDebugLog { + if serverDebugLog && len(healDisks) > 0 { console.Debugf(color.Green("healDisk:")+" disk check timer fired, attempting to heal %d drives\n", len(healDisks)) } diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go index 8ff3bad76..3bf66acfd 100644 --- a/cmd/data-update-tracker.go +++ b/cmd/data-update-tracker.go @@ -176,9 +176,6 @@ func (d *dataUpdateTracker) latestWithDir(dir string) uint64 { return d.current() } if isReservedOrInvalidBucket(bucket, false) { - if d.debug { - console.Debugf(dateUpdateTrackerLogPrefix+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, dir) - } return d.current() } @@ -486,9 +483,6 @@ func (d *dataUpdateTracker) startCollector(ctx context.Context) { } if isReservedOrInvalidBucket(bucket, false) { - if d.debug { - console.Debugf(color.Green("dataUpdateTracker:")+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, in) - } continue } split := splitPathDeterministic(in) @@ -512,7 +506,6 @@ func (d *dataUpdateTracker) markDirty(bucket, prefix string) { } if isReservedOrInvalidBucket(bucket, false) && d.debug { - console.Debugf(dateUpdateTrackerLogPrefix+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, prefix) return } split := splitPathDeterministic(pathJoin(bucket, prefix)) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index e5a86b941..5179466bd 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -26,7 +26,6 @@ import ( "net/http" "sort" "strconv" - "strings" "sync" "time" @@ -948,24 +947,16 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre AskDisks: globalAPIConfig.getListQuorum(), } - // Shortcut for APN/1.0 Veeam/1.0 Backup/10.0 - // It requests unique blocks with a specific prefix. - // We skip scanning the parent directory for - // more objects matching the prefix. - ri := logger.GetReqInfo(ctx) - if ri != nil && strings.Contains(ri.UserAgent, `1.0 Veeam/1.0 Backup`) && strings.HasSuffix(prefix, ".blk") { - opts.discardResult = true - opts.Transient = true - } - - merged, err := z.listPath(ctx, opts) + merged, err := z.listPath(ctx, &opts) if err != nil && err != io.EOF { return loi, err } if versionMarker == "" { + o := listPathOptions{Marker: marker} // If we are not looking for a specific version skip it. - marker, _ = parseMarker(marker) - merged.forwardPast(marker) + + o.parseMarker() + merged.forwardPast(o.Marker) } objects := merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker) loi.IsTruncated = err == nil && len(objects) > 0 @@ -982,7 +973,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre } if loi.IsTruncated { last := objects[len(objects)-1] - loi.NextMarker = encodeMarker(last.Name, merged.listID) + loi.NextMarker = opts.encodeMarker(last.Name) loi.NextVersionIDMarker = last.VersionID } return loi, nil @@ -1000,8 +991,7 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int { func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { var loi ListObjectsInfo - - merged, err := z.listPath(ctx, listPathOptions{ + opts := listPathOptions{ Bucket: bucket, Prefix: prefix, Separator: delimiter, @@ -1009,13 +999,14 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma Marker: marker, InclDeleted: false, AskDisks: globalAPIConfig.getListQuorum(), - }) + } + merged, err := z.listPath(ctx, &opts) if err != nil && err != io.EOF { logger.LogIf(ctx, err) return loi, err } - marker, _ = parseMarker(marker) - merged.forwardPast(marker) + + merged.forwardPast(opts.Marker) // Default is recursive, if delimiter is set then list non recursive. objects := merged.fileInfos(bucket, prefix, delimiter) @@ -1033,7 +1024,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma } if loi.IsTruncated { last := objects[len(objects)-1] - loi.NextMarker = encodeMarker(last.Name, merged.listID) + loi.NextMarker = opts.encodeMarker(last.Name) } return loi, nil } diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 768a7293a..69d650140 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -34,7 +34,6 @@ type apiConfig struct { requestsPool chan struct{} clusterDeadline time.Duration listQuorum int - extendListLife time.Duration corsAllowOrigins []string // total drives per erasure set across pools. totalDriveCount int @@ -81,7 +80,6 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { } t.requestsDeadline = cfg.RequestsDeadline t.listQuorum = cfg.GetListQuorum() - t.extendListLife = cfg.ExtendListLife if globalReplicationPool != nil && cfg.ReplicationWorkers != t.replicationWorkers { globalReplicationPool.ResizeFailedWorkers(cfg.ReplicationFailedWorkers) @@ -98,13 +96,6 @@ func (t *apiConfig) getListQuorum() int { return t.listQuorum } -func (t *apiConfig) getExtendListLife() time.Duration { - t.mu.RLock() - defer t.mu.RUnlock() - - return t.extendListLife -} - func (t *apiConfig) getCorsAllowOrigins() []string { t.mu.RLock() defer t.mu.RUnlock() diff --git a/cmd/metacache-bucket.go b/cmd/metacache-bucket.go index f82425e9d..e8485e378 100644 --- a/cmd/metacache-bucket.go +++ b/cmd/metacache-bucket.go @@ -23,10 +23,8 @@ import ( "errors" "fmt" "net/http" - "path" "runtime/debug" "sort" - "strings" "sync" "time" @@ -51,9 +49,8 @@ type bucketMetacache struct { cachesRoot map[string][]string `msg:"-"` // Internal state - mu sync.RWMutex `msg:"-"` - updated bool `msg:"-"` - transient bool `msg:"-"` // bucket used for non-persisted caches. + mu sync.RWMutex `msg:"-"` + updated bool `msg:"-"` } // newBucketMetacache creates a new bucketMetacache. @@ -146,9 +143,6 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, // save the bucket cache to the object storage. func (b *bucketMetacache) save(ctx context.Context) error { - if b.transient { - return nil - } objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized @@ -195,76 +189,24 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { return metacache{} } - if o.Bucket != b.bucket && !b.transient { + if o.Bucket != b.bucket { logger.Info("bucketMetacache.findCache: bucket %s does not match this bucket %s", o.Bucket, b.bucket) debug.PrintStack() return metacache{} } - extend := globalAPIConfig.getExtendListLife() - // Grab a write lock, since we create one if we cannot find one. - if o.Create { - b.mu.Lock() - defer b.mu.Unlock() - } else { - b.mu.RLock() - defer b.mu.RUnlock() - } + b.mu.Lock() + defer b.mu.Unlock() // Check if exists already. if c, ok := b.caches[o.ID]; ok { + c.lastHandout = time.Now() + b.caches[o.ID] = c b.debugf("returning existing %v", o.ID) return c } - // No need to do expensive checks on transients. - if b.transient { - if !o.Create { - return metacache{ - id: o.ID, - bucket: o.Bucket, - status: scanStateNone, - } - } - // Create new - best := o.newMetacache() - b.caches[o.ID] = best - b.updated = true - b.debugf("returning new cache %s, bucket: %v", best.id, best.bucket) - return best - } - - var best metacache - rootSplit := strings.Split(o.BaseDir, slashSeparator) - for i := range rootSplit { - interesting := b.cachesRoot[path.Join(rootSplit[:i+1]...)] - - for _, id := range interesting { - cached, ok := b.caches[id] - if !ok { - continue - } - if !cached.matches(&o, extend) { - continue - } - if cached.started.Before(best.started) { - b.debugf("cache %s disregarded - we have a better", cached.id) - // If we already have a newer, keep that. - continue - } - best = cached - } - } - if !best.started.IsZero() { - if o.Create { - best.lastHandout = UTCNow() - b.caches[best.id] = best - b.updated = true - } - b.debugf("returning cached %s, status: %v, ended: %v", best.id, best.status, best.ended) - return best - } if !o.Create { return metacache{ id: o.ID, @@ -274,7 +216,7 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { } // Create new and add. - best = o.newMetacache() + best := o.newMetacache() b.caches[o.ID] = best b.cachesRoot[best.root] = append(b.cachesRoot[best.root], best.id) b.updated = true @@ -286,19 +228,13 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { func (b *bucketMetacache) cleanup() { // Entries to remove. remove := make(map[string]struct{}) - currentCycle := intDataUpdateTracker.current() // Test on a copy // cleanup is the only one deleting caches. - caches, rootIdx := b.cloneCaches() + caches, _ := b.cloneCaches() for id, cache := range caches { - if b.transient && time.Since(cache.lastUpdate) > 10*time.Minute && time.Since(cache.lastHandout) > 10*time.Minute { - // Keep transient caches only for 15 minutes. - remove[id] = struct{}{} - continue - } - if !cache.worthKeeping(currentCycle) { + if !cache.worthKeeping() { b.debugf("cache %s not worth keeping", id) remove[id] = struct{}{} continue @@ -308,44 +244,13 @@ func (b *bucketMetacache) cleanup() { remove[id] = struct{}{} continue } - if cache.bucket != b.bucket && !b.transient { + if cache.bucket != b.bucket { logger.Info("cache bucket mismatch %s != %s", b.bucket, cache.bucket) remove[id] = struct{}{} continue } } - // Check all non-deleted against eachother. - // O(n*n), but should still be rather quick. - for id, cache := range caches { - if b.transient { - break - } - if _, ok := remove[id]; ok { - continue - } - - interesting := interestingCaches(cache.root, rootIdx) - for _, id2 := range interesting { - if _, ok := remove[id2]; ok || id2 == id { - // Don't check against one we are already removing - continue - } - cache2, ok := caches[id2] - if !ok { - continue - } - - if cache.canBeReplacedBy(&cache2) { - b.debugf("cache %s can be replaced by %s", id, cache2.id) - remove[id] = struct{}{} - break - } else { - b.debugf("cache %s can be NOT replaced by %s", id, cache2.id) - } - } - } - // If above limit, remove the caches with the oldest handout time. if len(caches)-len(remove) > metacacheMaxEntries { remainCaches := make([]metacache, 0, len(caches)-len(remove)) @@ -374,18 +279,6 @@ func (b *bucketMetacache) cleanup() { } } -// Potentially interesting caches. -// Will only add root if request is for root. -func interestingCaches(root string, cachesRoot map[string][]string) []string { - var interesting []string - rootSplit := strings.Split(root, slashSeparator) - for i := range rootSplit { - want := path.Join(rootSplit[:i+1]...) - interesting = append(interesting, cachesRoot[want]...) - } - return interesting -} - // updateCacheEntry will update a cache. // Returns the updated status. func (b *bucketMetacache) updateCacheEntry(update metacache) (metacache, error) { @@ -434,25 +327,10 @@ func (b *bucketMetacache) deleteAll() { defer b.mu.Unlock() b.updated = true - if !b.transient { - // Delete all. - ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator)) - b.caches = make(map[string]metacache, 10) - b.cachesRoot = make(map[string][]string, 10) - return - } - - // Transient are in different buckets. - var wg sync.WaitGroup - for id := range b.caches { - wg.Add(1) - go func(cache metacache) { - defer wg.Done() - ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id)) - }(b.caches[id]) - } - wg.Wait() + // Delete all. + ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator)) b.caches = make(map[string]metacache, 10) + b.cachesRoot = make(map[string][]string, 10) } // deleteCache will delete a specific cache and all files related to it across the cluster. diff --git a/cmd/metacache-bucket_test.go b/cmd/metacache-bucket_test.go index bdbe9f205..d4266bf45 100644 --- a/cmd/metacache-bucket_test.go +++ b/cmd/metacache-bucket_test.go @@ -46,8 +46,6 @@ func Benchmark_bucketMetacache_findCache(b *testing.B) { Recursive: false, Separator: slashSeparator, Create: true, - CurrentCycle: uint64(i), - OldestCycle: uint64(i - 1), }) } b.ReportAllocs() @@ -65,8 +63,6 @@ func Benchmark_bucketMetacache_findCache(b *testing.B) { Recursive: false, Separator: slashSeparator, Create: true, - CurrentCycle: uint64(i % elements), - OldestCycle: uint64(0), }) } } diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index b52dde9cb..36eae12db 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -19,9 +19,12 @@ package cmd import ( "bytes" + "context" "os" "sort" "strings" + + "github.com/minio/pkg/console" ) // metaCacheEntry is an object or a directory within an unknown bucket. @@ -38,7 +41,7 @@ type metaCacheEntry struct { // isDir returns if the entry is representing a prefix directory. func (e metaCacheEntry) isDir() bool { - return len(e.metadata) == 0 + return len(e.metadata) == 0 && strings.HasSuffix(e.name, slashSeparator) } // isObject returns if the entry is representing an object. @@ -51,15 +54,6 @@ func (e metaCacheEntry) hasPrefix(s string) bool { return strings.HasPrefix(e.name, s) } -// likelyMatches returns if the entries match by comparing name and metadata length. -func (e *metaCacheEntry) likelyMatches(other *metaCacheEntry) bool { - // This should reject 99% - if len(e.metadata) != len(other.metadata) || e.name != other.name { - return false - } - return true -} - // matches returns if the entries match by comparing their latest version fileinfo. func (e *metaCacheEntry) matches(other *metaCacheEntry, bucket string) bool { if e == nil && other == nil { @@ -510,6 +504,111 @@ func (m *metaCacheEntriesSorted) forwardPast(s string) { m.o = m.o[idx:] } +// mergeEntryChannels will merge entries from in and return them sorted on out. +// To signify no more results are on an input channel, close it. +// The output channel will be closed when all inputs are emptied. +// If file names are equal, compareMeta is called to select which one to choose. +// The entry not chosen will be discarded. +// If the context is canceled the function will return the error, +// otherwise the function will return nil. +func mergeEntryChannels(ctx context.Context, in []chan metaCacheEntry, out chan<- metaCacheEntry, compareMeta func(existing, other *metaCacheEntry) (replace bool)) error { + defer close(out) + top := make([]*metaCacheEntry, len(in)) + nDone := 0 + ctxDone := ctx.Done() + + // Use simpler forwarder. + if len(in) == 1 { + for { + select { + case <-ctxDone: + return ctx.Err() + case v, ok := <-in[0]: + if !ok { + return nil + } + select { + case <-ctxDone: + return ctx.Err() + case out <- v: + } + } + } + } + + selectFrom := func(idx int) error { + select { + case <-ctxDone: + return ctx.Err() + case entry, ok := <-in[idx]: + if !ok { + top[idx] = nil + nDone++ + } else { + top[idx] = &entry + } + } + return nil + } + // Populate all... + for i := range in { + if err := selectFrom(i); err != nil { + return err + } + } + last := "" + + // Choose the best to return. + for { + if nDone == len(in) { + return nil + } + best := top[0] + bestIdx := 0 + for i, other := range top[1:] { + otherIdx := i + 1 + if other == nil { + continue + } + if best == nil { + best = other + bestIdx = otherIdx + continue + } + if best.name == other.name { + if compareMeta(best, other) { + // Replace "best" + if err := selectFrom(bestIdx); err != nil { + return err + } + best = other + bestIdx = otherIdx + } else { + // Keep best, replace "other" + if err := selectFrom(otherIdx); err != nil { + return err + } + } + continue + } + if best.name > other.name { + best = other + bestIdx = otherIdx + } + } + if best.name > last { + out <- *best + last = best.name + } else { + console.Debugln("mergeEntryChannels: discarding duplicate", best.name, "<=", last) + } + // Replace entry we just sent. + if err := selectFrom(bestIdx); err != nil { + return err + } + } +} + // merge will merge other into m. // If the same entries exists in both and metadata matches only one is added, // otherwise the entry from m will be placed first. @@ -633,44 +732,3 @@ func (m *metaCacheEntriesSorted) entries() metaCacheEntries { } return m.o } - -// deduplicate entries in the list. -// If compareMeta is set it will be used to resolve conflicts. -// The function should return whether the existing entry should be replaced with other. -// If no compareMeta is provided duplicates may be left. -// This is indicated by the returned boolean. -func (m *metaCacheEntriesSorted) deduplicate(compareMeta func(existing, other *metaCacheEntry) (replace bool)) (dupesLeft bool) { - dst := m.o[:0] - for j := range m.o { - found := false - obj := &m.o[j] - for i := len(dst) - 1; i >= 0; i++ { - existing := &dst[i] - if existing.name != obj.name { - break - } - - // Use given resolution function first if any. - if compareMeta != nil { - if compareMeta(existing, obj) { - dst[i] = *obj - } - found = true - break - } - if obj.likelyMatches(existing) { - found = true - break - } - - // Matches, move on. - dupesLeft = true - continue - } - if !found { - dst = append(dst, *obj) - } - } - m.o = dst - return dupesLeft -} diff --git a/cmd/metacache-entries_test.go b/cmd/metacache-entries_test.go index 380933263..518bd12b6 100644 --- a/cmd/metacache-entries_test.go +++ b/cmd/metacache-entries_test.go @@ -18,7 +18,6 @@ package cmd import ( - "bytes" "reflect" "sort" "testing" @@ -96,51 +95,6 @@ func Test_metaCacheEntries_merge(t *testing.T) { } } -func Test_metaCacheEntries_dedupe(t *testing.T) { - org := loadMetacacheSampleEntries(t) - a, b := org.shallowClone(), org.shallowClone() - - // Merge b into a - a.merge(b, -1) - if a.deduplicate(nil) { - t.Fatal("deduplicate returned duplicate entries left") - } - want := loadMetacacheSampleNames - got := a.entries().names() - if !reflect.DeepEqual(want, got) { - t.Errorf("got unexpected result: %#v", got) - } -} - -func Test_metaCacheEntries_dedupe2(t *testing.T) { - org := loadMetacacheSampleEntries(t) - a, b := org.shallowClone(), org.shallowClone() - - // Replace metadata in b - testMarker := []byte("sampleset") - for i := range b.o { - b.o[i].metadata = testMarker - } - - // Merge b into a - a.merge(b, -1) - if a.deduplicate(func(existing, other *metaCacheEntry) (replace bool) { - a := bytes.Equal(existing.metadata, testMarker) - b := bytes.Equal(other.metadata, testMarker) - if a == b { - t.Fatal("got same number of testmarkers, only one should be given", a, b) - } - return b - }) { - t.Fatal("deduplicate returned duplicate entries left, we should always resolve") - } - want := loadMetacacheSampleNames - got := a.entries().names() - if !reflect.DeepEqual(want, got) { - t.Errorf("got unexpected result: %#v", got) - } -} - func Test_metaCacheEntries_filterObjects(t *testing.T) { data := loadMetacacheSampleEntries(t) data.filterObjectsOnly() diff --git a/cmd/metacache-manager.go b/cmd/metacache-manager.go index 8b3fb350f..63a41618d 100644 --- a/cmd/metacache-manager.go +++ b/cmd/metacache-manager.go @@ -43,16 +43,11 @@ type metacacheManager struct { trash map[string]metacache // Recently deleted lists. } -const metacacheManagerTransientBucket = "**transient**" const metacacheMaxEntries = 5000 // initManager will start async saving the cache. func (m *metacacheManager) initManager() { // Add a transient bucket. - tb := newBucketMetacache(metacacheManagerTransientBucket, false) - tb.transient = true - m.buckets[metacacheManagerTransientBucket] = tb - // Start saver when object layer is ready. go func() { objAPI := newObjectLayerFn() @@ -96,25 +91,6 @@ func (m *metacacheManager) initManager() { }() } -// findCache will get a metacache. -func (m *metacacheManager) findCache(ctx context.Context, o listPathOptions) metacache { - if o.Transient || isReservedOrInvalidBucket(o.Bucket, false) { - return m.getTransient().findCache(o) - } - m.mu.RLock() - b, ok := m.buckets[o.Bucket] - if ok { - m.mu.RUnlock() - return b.findCache(o) - } - if meta, ok := m.trash[o.ID]; ok { - m.mu.RUnlock() - return meta - } - m.mu.RUnlock() - return m.getBucket(ctx, o.Bucket).findCache(o) -} - // updateCacheEntry will update non-transient state. func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error) { m.mu.RLock() @@ -138,9 +114,6 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket m.init.Do(m.initManager) // Return a transient bucket for invalid or system buckets. - if isReservedOrInvalidBucket(bucket, false) { - return m.getTransient() - } m.mu.RLock() b, ok := m.buckets[bucket] m.mu.RUnlock() @@ -167,9 +140,7 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket // Load bucket. If we fail return the transient bucket. b, err := loadBucketMetaCache(ctx, bucket) if err != nil { - m.mu.Unlock() logger.LogIf(ctx, err) - return m.getTransient() } if b.bucket != bucket { logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket)) @@ -215,36 +186,20 @@ func (m *metacacheManager) deleteAll() { defer m.mu.Unlock() for bucket, b := range m.buckets { b.deleteAll() - if !b.transient { - delete(m.buckets, bucket) - } + delete(m.buckets, bucket) } } -// getTransient will return a transient bucket. -func (m *metacacheManager) getTransient() *bucketMetacache { - m.init.Do(m.initManager) - m.mu.RLock() - bmc := m.buckets[metacacheManagerTransientBucket] - m.mu.RUnlock() - return bmc -} - // checkMetacacheState should be used if data is not updating. // Should only be called if a failure occurred. func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTClient) error { // We operate on a copy... o.Create = false - var cache metacache - if rpc == nil || o.Transient { - cache = localMetacacheMgr.findCache(ctx, o) - } else { - c, err := rpc.GetMetacacheListing(ctx, o) - if err != nil { - return err - } - cache = *c + c, err := rpc.GetMetacacheListing(ctx, o) + if err != nil { + return err } + cache := *c if cache.status == scanStateNone || cache.fileNotFound { return errFileNotFound @@ -255,11 +210,7 @@ func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTC err := fmt.Errorf("timeout: list %s not updated", cache.id) cache.error = err.Error() cache.status = scanStateError - if rpc == nil || o.Transient { - localMetacacheMgr.updateCacheEntry(cache) - } else { - rpc.UpdateMetacacheListing(ctx, cache) - } + rpc.UpdateMetacacheListing(ctx, cache) return err } return nil diff --git a/cmd/metacache-marker.go b/cmd/metacache-marker.go index a259e0f35..c62673140 100644 --- a/cmd/metacache-marker.go +++ b/cmd/metacache-marker.go @@ -20,6 +20,7 @@ package cmd import ( "context" "fmt" + "strconv" "strings" "github.com/minio/minio/internal/logger" @@ -27,15 +28,16 @@ import ( // markerTagVersion is the marker version. // Should not need to be updated unless a fundamental change is made to the marker format. -const markerTagVersion = "v1" +const markerTagVersion = "v2" // parseMarker will parse a marker possibly encoded with encodeMarker -func parseMarker(s string) (marker, uuid string) { +func (o *listPathOptions) parseMarker() { + s := o.Marker if !strings.Contains(s, "[minio_cache:"+markerTagVersion) { - return s, "" + return } start := strings.LastIndex(s, "[") - marker = s[:start] + o.Marker = s[:start] end := strings.LastIndex(s, "]") tag := strings.Trim(s[start:end], "[]") tags := strings.Split(tag, ",") @@ -50,22 +52,41 @@ func parseMarker(s string) (marker, uuid string) { break } case "id": - uuid = kv[1] + o.ID = kv[1] + case "return": + o.ID = mustGetUUID() + o.Create = true + case "p": // pool + v, err := strconv.ParseInt(kv[1], 10, 64) + if err != nil { + o.ID = mustGetUUID() + o.Create = true + continue + } + o.pool = int(v) + case "s": // set + v, err := strconv.ParseInt(kv[1], 10, 64) + if err != nil { + o.ID = mustGetUUID() + o.Create = true + continue + } + o.set = int(v) default: // Ignore unknown } } - return } // encodeMarker will encode a uuid and return it as a marker. // uuid cannot contain '[', ':' or ','. -func encodeMarker(marker, uuid string) string { - if uuid == "" { - return marker +func (o listPathOptions) encodeMarker(marker string) string { + if o.ID == "" { + // Mark as returning listing... + return fmt.Sprintf("%s[minio_cache:%s,return:]", marker, markerTagVersion) } - if strings.ContainsAny(uuid, "[:,") { - logger.LogIf(context.Background(), fmt.Errorf("encodeMarker: uuid %s contained invalid characters", uuid)) + if strings.ContainsAny(o.ID, "[:,") { + logger.LogIf(context.Background(), fmt.Errorf("encodeMarker: uuid %s contained invalid characters", o.ID)) } - return fmt.Sprintf("%s[minio_cache:%s,id:%s]", marker, markerTagVersion, uuid) + return fmt.Sprintf("%s[minio_cache:%s,id:%s,p:%d,s:%d]", marker, markerTagVersion, o.ID, o.pool, o.set) } diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 831b66860..dd1a63e0e 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "os" - "path" pathutil "path" "strings" "sync" @@ -57,7 +56,7 @@ func renameAllBucketMetacache(epPath string) error { // Required important fields are Bucket, Prefix, Separator. // Other important fields are Limit, Marker. // List ID always derived from the Marker. -func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) { +func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) { if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, z); err != nil { return entries, err } @@ -95,140 +94,217 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e } // Decode and get the optional list id from the marker. - o.Marker, o.ID = parseMarker(o.Marker) - o.Create = o.ID == "" - if o.ID == "" { - o.ID = mustGetUUID() - } + o.parseMarker() o.BaseDir = baseDirFromPrefix(o.Prefix) - if o.discardResult { - // Override for single object. - o.BaseDir = o.Prefix + o.Transient = o.Transient || isReservedOrInvalidBucket(o.Bucket, false) + if o.Transient { + o.Create = false } - // For very small recursive listings, don't same cache. - // Attempts to avoid expensive listings to run for a long - // while when clients aren't interested in results. - // If the client DOES resume the listing a full cache - // will be generated due to the marker without ID and this check failing. - if o.Limit < 10 && o.Marker == "" && o.Create && o.Recursive { - o.discardResult = true - o.Transient = true - } - - var cache metacache + // We have 2 cases: + // 1) Cold listing, just list. + // 2) Returning, but with no id. Start async listing. + // 3) Returning, with ID, stream from list. + // // If we don't have a list id we must ask the server if it has a cache or create a new. - if o.Create { - o.CurrentCycle = intDataUpdateTracker.current() - o.OldestCycle = globalNotificationSys.findEarliestCleanBloomFilter(ctx, path.Join(o.Bucket, o.BaseDir)) - var cache metacache + if o.ID != "" && !o.Transient { + // Create or ping with handout... rpc := globalNotificationSys.restClientFromHash(o.Bucket) - if isReservedOrInvalidBucket(o.Bucket, false) { - rpc = nil - o.Transient = true - } - // Apply prefix filter if enabled. - o.SetFilter() - if rpc == nil || o.Transient { - // Local - cache = localMetacacheMgr.findCache(ctx, o) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + var c *metacache + if rpc == nil { + resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o) + c = &resp } else { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - c, err := rpc.GetMetacacheListing(ctx, o) - if err != nil { - if errors.Is(err, context.Canceled) { - // Context is canceled, return at once. - // request canceled, no entries to return - return entries, io.EOF - } - if !errors.Is(err, context.DeadlineExceeded) { - logger.LogIf(ctx, err) - } - o.Transient = true - cache = localMetacacheMgr.findCache(ctx, o) + c, err = rpc.GetMetacacheListing(ctx, *o) + } + if err != nil { + if errors.Is(err, context.Canceled) { + // Context is canceled, return at once. + // request canceled, no entries to return + return entries, io.EOF + } + if !errors.Is(err, context.DeadlineExceeded) { + // TODO: Remove, not really informational. + logger.LogIf(ctx, err) + o.debugln("listPath: deadline exceeded") + } + o.Transient = true + o.Create = false + o.ID = mustGetUUID() + } else { + if c.fileNotFound { + // No cache found, no entries found. + return entries, io.EOF + } + if c.status == scanStateError || c.status == scanStateNone { + o.ID = "" + o.Create = false + o.debugln("scan status", c.status, " - waiting a roundtrip to create") } else { - cache = *c + // Continue listing + o.ID = c.id } } - if cache.fileNotFound { - // No cache found, no entries found. - return entries, io.EOF - } - // Only create if we created a new. - o.Create = o.ID == cache.id - o.ID = cache.id } + if o.ID != "" && !o.Transient { + // We have an existing list ID, continue streaming. + if o.Create { + o.debugln("Creating", o) + entries, err = z.listAndSave(ctx, o) + if err == nil || err == io.EOF { + return entries, err + } + entries.truncate(0) + } else { + if o.pool < len(z.serverPools) && o.set < len(z.serverPools[o.pool].sets) { + o.debugln("Resuming", o) + entries, err = z.serverPools[o.pool].sets[o.set].streamMetadataParts(ctx, *o) + if err == nil { + return entries, nil + } + } else { + err = fmt.Errorf("invalid pool/set") + o.pool, o.set = 0, 0 + } + } + if IsErr(err, []error{ + nil, + context.Canceled, + context.DeadlineExceeded, + // io.EOF is expected and should be returned but no need to log it. + io.EOF, + }...) { + // Expected good errors we don't need to return error. + return entries, err + } + entries.truncate(0) + go func() { + rpc := globalNotificationSys.restClientFromHash(o.Bucket) + ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) + defer cancel() + if c, err := rpc.GetMetacacheListing(ctx, *o); err == nil { + c.error = "no longer used" + c.status = scanStateError + } + }() + o.ID = "" + } + + // Do listing in-place. + // Create output for our results. + // Create filter for results. + o.debugln("Raw List", o) + filterCh := make(chan metaCacheEntry, o.Limit) + filteredResults := o.gatherResults(filterCh) + listCtx, cancelList := context.WithCancel(ctx) + var wg sync.WaitGroup + wg.Add(1) + var listErr error + + go func(o listPathOptions) { + defer wg.Done() + o.Limit = 0 + listErr = z.listMerged(listCtx, o, filterCh) + o.debugln("listMerged returned with", listErr) + }(*o) + + entries, err = filteredResults() + cancelList() + wg.Wait() + if listErr != nil && !errors.Is(listErr, context.Canceled) { + return entries, listErr + } + truncated := entries.len() > o.Limit || err == nil + entries.truncate(o.Limit) + if !o.Transient && truncated { + if o.ID == "" { + entries.listID = mustGetUUID() + } else { + entries.listID = o.ID + } + } + if !truncated { + return entries, io.EOF + } + return entries, nil +} + +// listMerged will list across all sets and return a merged results stream. +// The result channel is closed when no more results are expected. +func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) error { var mu sync.Mutex var wg sync.WaitGroup var errs []error allAtEOF := true + var inputs []chan metaCacheEntry mu.Lock() // Ask all sets and merge entries. + listCtx, cancelList := context.WithCancel(ctx) + defer cancelList() for _, pool := range z.serverPools { for _, set := range pool.sets { wg.Add(1) + results := make(chan metaCacheEntry, 100) + inputs = append(inputs, results) go func(i int, set *erasureObjects) { defer wg.Done() - e, err := set.listPath(ctx, o) + err := set.listPath(listCtx, o, results) mu.Lock() defer mu.Unlock() if err == nil { allAtEOF = false } errs[i] = err - entries.merge(e, -1) - - // Resolve non-trivial conflicts - entries.deduplicate(func(existing, other *metaCacheEntry) (replace bool) { - // Pick object over directory - if existing.isDir() && !other.isDir() { - return true - } - if !existing.isDir() && other.isDir() { - return false - } - eFIV, err := existing.fileInfo(o.Bucket) - if err != nil { - return true - } - oFIV, err := other.fileInfo(o.Bucket) - if err != nil { - return false - } - // Replace if modtime is newer - if !oFIV.ModTime.Equal(eFIV.ModTime) { - return oFIV.ModTime.After(eFIV.ModTime) - } - // Use NumVersions as a final tiebreaker. - return oFIV.NumVersions > eFIV.NumVersions - }) - if entries.len() > o.Limit { - allAtEOF = false - entries.truncate(o.Limit) - } }(len(errs), set) errs = append(errs, nil) } } mu.Unlock() + + // Gather results to a single channel. + err := mergeEntryChannels(ctx, inputs, results, func(existing, other *metaCacheEntry) (replace bool) { + // Pick object over directory + if existing.isDir() && !other.isDir() { + return true + } + if !existing.isDir() && other.isDir() { + return false + } + + eFIV, err := existing.fileInfo(o.Bucket) + if err != nil { + return true + } + oFIV, err := other.fileInfo(o.Bucket) + if err != nil { + return false + } + // Replace if modtime is newer + if !oFIV.ModTime.Equal(eFIV.ModTime) { + return oFIV.ModTime.After(eFIV.ModTime) + } + // Use NumVersions as a final tiebreaker. + return oFIV.NumVersions > eFIV.NumVersions + }) + + cancelList() wg.Wait() + if err != nil { + return err + } + if contextCanceled(ctx) { + return ctx.Err() + } if isAllNotFound(errs) { - // All sets returned not found. - go func() { - // Update master cache with that information. - cache.status = scanStateSuccess - cache.fileNotFound = true - o.updateMetacacheListing(cache, globalNotificationSys.restClientFromHash(o.Bucket)) - }() - // cache returned not found, entries truncated. - return entries, io.EOF + return nil } for _, err := range errs { - if err == nil { + if err == nil || contextCanceled(ctx) { allAtEOF = false continue } @@ -236,15 +312,66 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e continue } logger.LogIf(ctx, err) - return entries, err + return err } - truncated := entries.len() > o.Limit || !allAtEOF - entries.truncate(o.Limit) - if !o.discardResult { - entries.listID = o.ID + if allAtEOF { + // TODO" Maybe, maybe not + return io.EOF } - if !truncated { - return entries, io.EOF - } - return entries, nil + return nil +} + +func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) { + // Use ID as the object name... + o.pool = z.getAvailablePoolIdx(ctx, minioMetaBucket, o.ID, 10<<20) + if o.pool < 0 { + // No space or similar, don't persist the listing. + o.pool = 0 + o.Create = false + o.ID = "" + o.Transient = true + return entries, errDiskFull + } + o.set = z.serverPools[o.pool].getHashedSetIndex(o.ID) + saver := z.serverPools[o.pool].sets[o.set] + + // Disconnect from call above, but cancel on exit. + listCtx, cancel := context.WithCancel(GlobalContext) + saveCh := make(chan metaCacheEntry, metacacheBlockSize) + inCh := make(chan metaCacheEntry, metacacheBlockSize) + outCh := make(chan metaCacheEntry, o.Limit) + + filteredResults := o.gatherResults(outCh) + + mc := o.newMetacache() + meta := metaCacheRPC{meta: &mc, cancel: cancel, rpc: globalNotificationSys.restClientFromHash(o.Bucket), o: *o} + + // Save listing... + go func() { + if err := saver.saveMetaCacheStream(listCtx, &meta, saveCh); err != nil { + meta.setErr(err.Error()) + } + cancel() + }() + + // Do listing... + go func(o listPathOptions) { + err := z.listMerged(listCtx, o, inCh) + if err != nil { + meta.setErr(err.Error()) + } + o.debugln("listAndSave: listing", o.ID, "finished with ", err) + }(*o) + + // Write listing to results and saver. + go func() { + for entry := range inCh { + outCh <- entry + saveCh <- entry + } + close(outCh) + close(saveCh) + }() + + return filteredResults() } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 6ccd1a190..3b172690f 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "io" + "math/rand" "strconv" "strings" "sync" @@ -82,13 +83,6 @@ type listPathOptions struct { // Create indicates that the lister should not attempt to load an existing cache. Create bool - // CurrentCycle indicates the current bloom cycle. - // Will be used if a new scan is started. - CurrentCycle uint64 - - // OldestCycle indicates the oldest cycle acceptable. - OldestCycle uint64 - // Include pure directories. IncludeDirectories bool @@ -97,9 +91,8 @@ type listPathOptions struct { // A transient result will never be returned from the cache so knowing the list id is required. Transient bool - // discardResult will not persist the cache to storage. - // When the initial results are returned listing will be canceled. - discardResult bool + // pool and set of where the cache is located. + pool, set int } func init() { @@ -109,20 +102,18 @@ func init() { // newMetacache constructs a new metacache from the options. func (o listPathOptions) newMetacache() metacache { return metacache{ - id: o.ID, - bucket: o.Bucket, - root: o.BaseDir, - recursive: o.Recursive, - status: scanStateStarted, - error: "", - started: UTCNow(), - lastHandout: UTCNow(), - lastUpdate: UTCNow(), - ended: time.Time{}, - startedCycle: o.CurrentCycle, - endedCycle: 0, - dataVersion: metacacheStreamVersion, - filter: o.FilterPrefix, + id: o.ID, + bucket: o.Bucket, + root: o.BaseDir, + recursive: o.Recursive, + status: scanStateStarted, + error: "", + started: UTCNow(), + lastHandout: UTCNow(), + lastUpdate: UTCNow(), + ended: time.Time{}, + dataVersion: metacacheStreamVersion, + filter: o.FilterPrefix, } } @@ -240,9 +231,6 @@ func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) { // updateMetacacheListing will update the metacache listing. func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) { - if o.Transient { - return localMetacacheMgr.getTransient().updateCacheEntry(m) - } if rpc == nil { return localMetacacheMgr.updateCacheEntry(m) } @@ -274,9 +262,6 @@ func (o *listPathOptions) SetFilter() { switch { case metacacheSharePrefix: return - case o.CurrentCycle != o.OldestCycle: - // We have a clean bloom filter - return case o.Prefix == o.BaseDir: // No additional prefix return @@ -521,251 +506,207 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt } // Will return io.EOF if continuing would not yield more results. -func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) { +func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) (err error) { + defer close(results) o.debugf(color.Green("listPath:")+" with options: %#v", o) - // See if we have the listing stored. - if !o.Create && !o.discardResult { - entries, err := er.streamMetadataParts(ctx, o) - if IsErr(err, []error{ - nil, - context.Canceled, - context.DeadlineExceeded, - }...) { - // Expected good errors we don't need to return error. - return entries, nil - } - - if !errors.Is(err, io.EOF) { // io.EOF is expected and should be returned but no need to log it. - // Log an return errors on unexpected errors. - logger.LogIf(ctx, err) - } - - return entries, err - } - - meta := o.newMetacache() - rpc := globalNotificationSys.restClientFromHash(o.Bucket) - var metaMu sync.Mutex - - o.debugln(color.Green("listPath:")+" scanning bucket:", o.Bucket, "basedir:", o.BaseDir, "prefix:", o.Prefix, "marker:", o.Marker) - - // Disconnect from call above, but cancel on exit. - ctx, cancel := context.WithCancel(GlobalContext) - // We need to ask disks. + askDisks := o.AskDisks + listingQuorum := o.AskDisks - 1 disks := er.getOnlineDisks() - defer func() { - o.debugln(color.Green("listPath:")+" returning:", entries.len(), "err:", err) - if err != nil && !errors.Is(err, io.EOF) { - go func(err string) { - metaMu.Lock() - if meta.status != scanStateError { - meta.error = err - meta.status = scanStateError - } - meta, _ = o.updateMetacacheListing(meta, rpc) - metaMu.Unlock() - }(err.Error()) - cancel() - } - }() - - askDisks := o.AskDisks - if askDisks == 0 { - askDisks = globalAPIConfig.getListQuorum() - } - // make sure atleast default '3' lists object is present. - listingQuorum := askDisks + // Special case: ask all disks if the drive count is 4 if askDisks == -1 || er.setDriveCount == 4 { askDisks = len(disks) // with 'strict' quorum list on all online disks. listingQuorum = getReadQuorum(er.setDriveCount) } - - if len(disks) < askDisks { - err = InsufficientReadQuorum{} - logger.LogIf(ctx, fmt.Errorf("listPath: Insufficient disks, %d of %d needed are available", len(disks), askDisks)) - cancel() - return + if askDisks == 0 { + askDisks = globalAPIConfig.getListQuorum() + listingQuorum = askDisks } - - // Select askDisks random disks. - if len(disks) > askDisks { + if askDisks > 0 && len(disks) > askDisks { + rand.Shuffle(len(disks), func(i, j int) { + disks[i], disks[j] = disks[j], disks[i] + }) disks = disks[:askDisks] } - // Create output for our results. - var cacheCh chan metaCacheEntry - if !o.discardResult { - cacheCh = make(chan metaCacheEntry, metacacheBlockSize) + // How to resolve results. + resolver := metadataResolutionParams{ + dirQuorum: listingQuorum, + objQuorum: listingQuorum, + bucket: o.Bucket, } - // Create filter for results. - filterCh := make(chan metaCacheEntry, 100) - filteredResults := o.gatherResults(filterCh) - closeChannels := func() { - if !o.discardResult { - close(cacheCh) - } - close(filterCh) - } - - // Cancel listing on return if non-saved list. - if o.discardResult { - defer cancel() - } - - go func() { - defer cancel() - // Save continuous updates - go func() { - var err error - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - var exit bool - for !exit { + ctxDone := ctx.Done() + return listPathRaw(ctx, listPathRawOptions{ + disks: disks, + bucket: o.Bucket, + path: o.BaseDir, + recursive: o.Recursive, + filterPrefix: o.FilterPrefix, + minDisks: listingQuorum, + forwardTo: o.Marker, + agreed: func(entry metaCacheEntry) { + select { + case <-ctxDone: + case results <- entry: + } + }, + partial: func(entries metaCacheEntries, nAgreed int, errs []error) { + // Results Disagree :-( + entry, ok := entries.resolve(&resolver) + if ok { select { - case <-ticker.C: - case <-ctx.Done(): - exit = true + case <-ctxDone: + case results <- *entry: } - metaMu.Lock() - meta.endedCycle = intDataUpdateTracker.current() - meta, err = o.updateMetacacheListing(meta, rpc) - if meta.status == scanStateError { - logger.LogIf(ctx, err) - cancel() - exit = true - } - metaMu.Unlock() } - }() + }, + }) +} - const retryDelay = 200 * time.Millisecond - const maxTries = 5 +type metaCacheRPC struct { + o listPathOptions + mu sync.Mutex + meta *metacache + rpc *peerRESTClient + cancel context.CancelFunc +} - var bw *metacacheBlockWriter - // Don't save single object listings. - if !o.discardResult { - // Write results to disk. - bw = newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error { - // if the block is 0 bytes and its a first block skip it. - // skip only this for Transient caches. - if len(b.data) == 0 && b.n == 0 && o.Transient { - return nil - } - o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n)) - r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data))) - logger.LogIf(ctx, err) - custom := b.headerKV() - _, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{ - UserDefined: custom, - NoLock: true, // No need to hold namespace lock, each prefix caches uniquely. - ParentIsObject: nil, - }) - if err != nil { - metaMu.Lock() - if meta.error != "" { - meta.status = scanStateError - meta.error = err.Error() - } - metaMu.Unlock() - cancel() - return err - } - if b.n == 0 { - return nil - } - // Update block 0 metadata. - var retries int - for { - meta := b.headerKV() - fi := FileInfo{ - Metadata: make(map[string]string, len(meta)), - } - for k, v := range meta { - fi.Metadata[k] = v - } - err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi) - if err == nil { - break - } - switch err.(type) { - case ObjectNotFound: - return err - case InsufficientReadQuorum: - default: - logger.LogIf(ctx, err) - } - if retries >= maxTries { - return err - } - retries++ - time.Sleep(retryDelay) - } - return nil - }) - } +func (m *metaCacheRPC) setErr(err string) { + m.mu.Lock() + defer m.mu.Lock() + meta := *m.meta + if meta.status != scanStateError { + meta.error = err + meta.status = scanStateError + } else { + // An error is already set. + return + } + meta, _ = m.o.updateMetacacheListing(meta, m.rpc) + *m.meta = meta +} - // How to resolve results. - resolver := metadataResolutionParams{ - dirQuorum: listingQuorum, - objQuorum: listingQuorum, - bucket: o.Bucket, - } +func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCacheRPC, entries <-chan metaCacheEntry) (err error) { + o := mc.o + o.debugf(color.Green("saveMetaCacheStream:")+" with options: %#v", o) - err := listPathRaw(ctx, listPathRawOptions{ - disks: disks, - bucket: o.Bucket, - path: o.BaseDir, - recursive: o.Recursive, - filterPrefix: o.FilterPrefix, - minDisks: listingQuorum, - agreed: func(entry metaCacheEntry) { - if !o.discardResult { - cacheCh <- entry - } - filterCh <- entry - }, - partial: func(entries metaCacheEntries, nAgreed int, errs []error) { - // Results Disagree :-( - entry, ok := entries.resolve(&resolver) - if ok { - if !o.discardResult { - cacheCh <- *entry - } - filterCh <- *entry - } - }, - }) - - metaMu.Lock() - if err != nil { - meta.status = scanStateError - meta.error = err.Error() - } - // Save success - if meta.error == "" { - meta.status = scanStateSuccess - meta.endedCycle = intDataUpdateTracker.current() - } - - meta, _ = o.updateMetacacheListing(meta, rpc) - metaMu.Unlock() - - closeChannels() - if !o.discardResult { - if err := bw.Close(); err != nil { - metaMu.Lock() - meta.error = err.Error() - meta.status = scanStateError - meta, _ = o.updateMetacacheListing(meta, rpc) - metaMu.Unlock() - } + metaMu := &mc.mu + rpc := mc.rpc + cancel := mc.cancel + defer func() { + o.debugln(color.Green("saveMetaCacheStream:")+"err:", err) + if err != nil && !errors.Is(err, io.EOF) { + go mc.setErr(err.Error()) + cancel() } }() - return filteredResults() + defer cancel() + // Save continuous updates + go func() { + var err error + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + var exit bool + for !exit { + select { + case <-ticker.C: + case <-ctx.Done(): + exit = true + } + metaMu.Lock() + meta := *mc.meta + meta, err = o.updateMetacacheListing(meta, rpc) + *mc.meta = meta + if meta.status == scanStateError { + logger.LogIf(ctx, err) + cancel() + exit = true + } + metaMu.Unlock() + } + }() + + const retryDelay = 200 * time.Millisecond + const maxTries = 5 + + // Keep destination... + // Write results to disk. + bw := newMetacacheBlockWriter(entries, func(b *metacacheBlock) error { + // if the block is 0 bytes and its a first block skip it. + // skip only this for Transient caches. + if len(b.data) == 0 && b.n == 0 && o.Transient { + return nil + } + o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n)) + r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data))) + logger.LogIf(ctx, err) + custom := b.headerKV() + _, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{ + UserDefined: custom, + NoLock: true, // No need to hold namespace lock, each prefix caches uniquely. + ParentIsObject: nil, + }) + if err != nil { + mc.setErr(err.Error()) + cancel() + return err + } + if b.n == 0 { + return nil + } + // Update block 0 metadata. + var retries int + for { + meta := b.headerKV() + fi := FileInfo{ + Metadata: make(map[string]string, len(meta)), + } + for k, v := range meta { + fi.Metadata[k] = v + } + err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi) + if err == nil { + break + } + switch err.(type) { + case ObjectNotFound: + return err + case InsufficientReadQuorum: + default: + logger.LogIf(ctx, err) + } + if retries >= maxTries { + return err + } + retries++ + time.Sleep(retryDelay) + } + return nil + }) + + metaMu.Lock() + if err != nil { + mc.setErr(err.Error()) + return + } + // Save success + if mc.meta.error == "" { + mc.meta.status = scanStateSuccess + } + + meta := *mc.meta + meta, _ = o.updateMetacacheListing(meta, rpc) + *mc.meta = meta + metaMu.Unlock() + + if err := bw.Close(); err != nil { + mc.setErr(err.Error()) + } + + return } type listPathRawOptions struct { diff --git a/cmd/metacache-stream.go b/cmd/metacache-stream.go index 5eaa14d18..f3b1337de 100644 --- a/cmd/metacache-stream.go +++ b/cmd/metacache-stream.go @@ -52,7 +52,7 @@ import ( // Streams can be assumed to be sorted in ascending order. // If the stream ends before a false boolean it can be assumed it was truncated. -const metacacheStreamVersion = 1 +const metacacheStreamVersion = 2 // metacacheWriter provides a serializer of metacache objects. type metacacheWriter struct { @@ -262,7 +262,7 @@ func newMetacacheReader(r io.Reader) *metacacheReader { return err } switch v { - case metacacheStreamVersion: + case 1, 2: default: return fmt.Errorf("metacacheReader: Unknown version: %d", v) } diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 938551ddc..67349f74f 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -107,9 +107,18 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } prefix := opts.FilterPrefix - forward := opts.ForwardTo var scanDir func(path string) error + scanDir = func(current string) error { + // Skip forward, if requested... + forward := "" + if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) { + forward = strings.TrimPrefix(opts.ForwardTo, current) + if idx := strings.IndexByte(forward, '/'); idx > 0 { + forward = forward[:idx] + } + } + if contextCanceled(ctx) { return ctx.Err() } @@ -187,6 +196,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ sort.Strings(entries) dirStack := make([]string, 0, 5) prefix = "" // Remove prefix after first level. + if len(forward) > 0 { + idx := sort.SearchStrings(entries, forward) + if idx > 0 { + entries = entries[idx:] + } + } for _, entry := range entries { if entry == "" { @@ -256,10 +271,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ out <- metaCacheEntry{name: pop} if opts.Recursive { // Scan folder we found. Should be in correct sort order where we are. - forward = "" - if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, pop) { - forward = strings.TrimPrefix(opts.ForwardTo, pop) - } logger.LogIf(ctx, scanDir(pop)) } dirStack = dirStack[:len(dirStack)-1] diff --git a/cmd/metacache.go b/cmd/metacache.go index fa84693ed..9c3a39cef 100644 --- a/cmd/metacache.go +++ b/cmd/metacache.go @@ -65,8 +65,6 @@ type metacache struct { ended time.Time `msg:"end"` lastUpdate time.Time `msg:"u"` lastHandout time.Time `msg:"lh"` - startedCycle uint64 `msg:"stc"` - endedCycle uint64 `msg:"endc"` dataVersion uint8 `msg:"v"` } @@ -74,66 +72,8 @@ func (m *metacache) finished() bool { return !m.ended.IsZero() } -// matches returns whether the metacache matches the options given. -func (m *metacache) matches(o *listPathOptions, extend time.Duration) bool { - if o == nil { - return false - } - - // Never return transient caches if there is no id. - if m.status == scanStateError || m.status == scanStateNone || m.dataVersion != metacacheStreamVersion { - o.debugf("cache %s state or stream version mismatch", m.id) - return false - } - if m.startedCycle < o.OldestCycle { - o.debugf("cache %s cycle too old", m.id) - return false - } - - // Root of what we are looking for must at least have the same - if !strings.HasPrefix(o.BaseDir, m.root) { - o.debugf("cache %s prefix mismatch, cached:%v, want:%v", m.id, m.root, o.BaseDir) - return false - } - if m.filter != "" && strings.HasPrefix(m.filter, o.FilterPrefix) { - o.debugf("cache %s cannot be used because of filter %s", m.id, m.filter) - return false - } - - if o.Recursive && !m.recursive { - o.debugf("cache %s not recursive", m.id) - // If this is recursive the cached listing must be as well. - return false - } - if o.Separator != slashSeparator && !m.recursive { - o.debugf("cache %s not slashsep and not recursive", m.id) - // Non slash separator requires recursive. - return false - } - if !m.finished() && time.Since(m.lastUpdate) > metacacheMaxRunningAge { - o.debugf("cache %s not running, time: %v", m.id, time.Since(m.lastUpdate)) - // Abandoned - return false - } - - if m.finished() && m.endedCycle <= o.OldestCycle { - if extend <= 0 { - // If scan has ended the oldest requested must be less. - o.debugf("cache %s ended and cycle (%v) <= oldest allowed (%v)", m.id, m.endedCycle, o.OldestCycle) - return false - } - if time.Since(m.lastUpdate) > metacacheMaxRunningAge+extend { - // Cache ended within bloom cycle, but we can extend the life. - o.debugf("cache %s ended (%v) and beyond extended life (%v)", m.id, m.lastUpdate, metacacheMaxRunningAge+extend) - return false - } - } - - return true -} - // worthKeeping indicates if the cache by itself is worth keeping. -func (m *metacache) worthKeeping(currentCycle uint64) bool { +func (m *metacache) worthKeeping() bool { if m == nil { return false } @@ -142,59 +82,16 @@ func (m *metacache) worthKeeping(currentCycle uint64) bool { case !cache.finished() && time.Since(cache.lastUpdate) > metacacheMaxRunningAge: // Not finished and update for metacacheMaxRunningAge, discard it. return false - case cache.finished() && cache.startedCycle > currentCycle: - // Cycle is somehow bigger. - return false - case cache.finished() && time.Since(cache.lastHandout) > 48*time.Hour: - // Keep only for 2 days. Fallback if scanner is clogged. - return false - case cache.finished() && currentCycle >= dataUsageUpdateDirCycles && cache.startedCycle < currentCycle-dataUsageUpdateDirCycles: - // Cycle is too old to be valuable. + case cache.finished() && time.Since(cache.lastHandout) > 30*time.Minute: + // Keep only for 30 minutes. return false case cache.status == scanStateError || cache.status == scanStateNone: // Remove failed listings after 5 minutes. - return time.Since(cache.lastUpdate) < 5*time.Minute + return time.Since(cache.lastUpdate) > 5*time.Minute } return true } -// canBeReplacedBy. -// Both must pass the worthKeeping check. -func (m *metacache) canBeReplacedBy(other *metacache) bool { - // If the other is older it can never replace. - if other.started.Before(m.started) || m.id == other.id { - return false - } - if other.status == scanStateNone || other.status == scanStateError { - return false - } - if m.status == scanStateStarted && time.Since(m.lastUpdate) < metacacheMaxRunningAge { - return false - } - - // Keep it around a bit longer. - if time.Since(m.lastHandout) < 30*time.Minute || time.Since(m.lastUpdate) < metacacheMaxRunningAge { - return false - } - - // Go through recursive combinations. - switch { - case !m.recursive && !other.recursive: - // If both not recursive root must match. - return m.root == other.root && strings.HasPrefix(m.filter, other.filter) - case m.recursive && !other.recursive: - // A recursive can never be replaced by a non-recursive - return false - case !m.recursive && other.recursive: - // If other is recursive it must contain this root - return strings.HasPrefix(m.root, other.root) && other.filter == "" - case m.recursive && other.recursive: - // Similar if both are recursive - return strings.HasPrefix(m.root, other.root) && other.filter == "" - } - panic("should be unreachable") -} - // baseDirFromPrefix will return the base directory given an object path. // For example an object with name prefix/folder/object.ext will return `prefix/folder/`. func baseDirFromPrefix(prefix string) string { @@ -218,13 +115,17 @@ func (m *metacache) update(update metacache) { if m.status == scanStateStarted && update.status == scanStateSuccess { m.ended = UTCNow() - m.endedCycle = update.endedCycle } if m.status == scanStateStarted && update.status != scanStateStarted { m.status = update.status } + if m.status == scanStateStarted && time.Since(m.lastHandout) > 15*time.Minute { + m.status = scanStateError + m.error = "client not seen" + } + if m.error == "" && update.error != "" { m.error = update.error m.status = scanStateError diff --git a/cmd/metacache_gen.go b/cmd/metacache_gen.go index ef2577490..eb648f0b6 100644 --- a/cmd/metacache_gen.go +++ b/cmd/metacache_gen.go @@ -100,18 +100,6 @@ func (z *metacache) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "lastHandout") return } - case "stc": - z.startedCycle, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "startedCycle") - return - } - case "endc": - z.endedCycle, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "endedCycle") - return - } case "v": z.dataVersion, err = dc.ReadUint8() if err != nil { @@ -131,9 +119,9 @@ func (z *metacache) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 15 + // map header, size 13 // write "id" - err = en.Append(0x8f, 0xa2, 0x69, 0x64) + err = en.Append(0x8d, 0xa2, 0x69, 0x64) if err != nil { return } @@ -252,26 +240,6 @@ func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "lastHandout") return } - // write "stc" - err = en.Append(0xa3, 0x73, 0x74, 0x63) - if err != nil { - return - } - err = en.WriteUint64(z.startedCycle) - if err != nil { - err = msgp.WrapError(err, "startedCycle") - return - } - // write "endc" - err = en.Append(0xa4, 0x65, 0x6e, 0x64, 0x63) - if err != nil { - return - } - err = en.WriteUint64(z.endedCycle) - if err != nil { - err = msgp.WrapError(err, "endedCycle") - return - } // write "v" err = en.Append(0xa1, 0x76) if err != nil { @@ -288,9 +256,9 @@ func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 15 + // map header, size 13 // string "id" - o = append(o, 0x8f, 0xa2, 0x69, 0x64) + o = append(o, 0x8d, 0xa2, 0x69, 0x64) o = msgp.AppendString(o, z.id) // string "b" o = append(o, 0xa1, 0x62) @@ -325,12 +293,6 @@ func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) { // string "lh" o = append(o, 0xa2, 0x6c, 0x68) o = msgp.AppendTime(o, z.lastHandout) - // string "stc" - o = append(o, 0xa3, 0x73, 0x74, 0x63) - o = msgp.AppendUint64(o, z.startedCycle) - // string "endc" - o = append(o, 0xa4, 0x65, 0x6e, 0x64, 0x63) - o = msgp.AppendUint64(o, z.endedCycle) // string "v" o = append(o, 0xa1, 0x76) o = msgp.AppendUint8(o, z.dataVersion) @@ -431,18 +393,6 @@ func (z *metacache) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "lastHandout") return } - case "stc": - z.startedCycle, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "startedCycle") - return - } - case "endc": - z.endedCycle, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "endedCycle") - return - } case "v": z.dataVersion, bts, err = msgp.ReadUint8Bytes(bts) if err != nil { @@ -463,7 +413,7 @@ func (z *metacache) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *metacache) Msgsize() (s int) { - s = 1 + 3 + msgp.StringPrefixSize + len(z.id) + 2 + msgp.StringPrefixSize + len(z.bucket) + 5 + msgp.StringPrefixSize + len(z.root) + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.filter) + 5 + msgp.Uint8Size + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.error) + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 2 + msgp.TimeSize + 3 + msgp.TimeSize + 4 + msgp.Uint64Size + 5 + msgp.Uint64Size + 2 + msgp.Uint8Size + s = 1 + 3 + msgp.StringPrefixSize + len(z.id) + 2 + msgp.StringPrefixSize + len(z.bucket) + 5 + msgp.StringPrefixSize + len(z.root) + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.filter) + 5 + msgp.Uint8Size + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.error) + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 2 + msgp.TimeSize + 3 + msgp.TimeSize + 2 + msgp.Uint8Size return } diff --git a/cmd/metacache_test.go b/cmd/metacache_test.go index 345fe6278..882b76f99 100644 --- a/cmd/metacache_test.go +++ b/cmd/metacache_test.go @@ -37,8 +37,6 @@ var metaCacheTestset = []metacache{ ended: metaCacheTestsetTimestamp.Add(time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(time.Minute), lastHandout: metaCacheTestsetTimestamp, - startedCycle: 10, - endedCycle: 10, dataVersion: metacacheStreamVersion, }, 1: { @@ -53,8 +51,6 @@ var metaCacheTestset = []metacache{ ended: metaCacheTestsetTimestamp.Add(time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(time.Minute), lastHandout: metaCacheTestsetTimestamp, - startedCycle: 10, - endedCycle: 10, dataVersion: metacacheStreamVersion, }, 2: { @@ -69,8 +65,6 @@ var metaCacheTestset = []metacache{ ended: metaCacheTestsetTimestamp, lastUpdate: metaCacheTestsetTimestamp, lastHandout: metaCacheTestsetTimestamp, - startedCycle: 10, - endedCycle: 10, dataVersion: metacacheStreamVersion, }, 3: { @@ -85,8 +79,6 @@ var metaCacheTestset = []metacache{ ended: metaCacheTestsetTimestamp.Add(-20 * time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(-20 * time.Minute), lastHandout: metaCacheTestsetTimestamp.Add(-20 * time.Minute), - startedCycle: 10, - endedCycle: 10, dataVersion: metacacheStreamVersion, }, 4: { @@ -101,8 +93,6 @@ var metaCacheTestset = []metacache{ ended: time.Time{}, lastUpdate: metaCacheTestsetTimestamp.Add(-time.Minute), lastHandout: metaCacheTestsetTimestamp, - startedCycle: 10, - endedCycle: 10, dataVersion: metacacheStreamVersion, }, 5: { @@ -117,8 +107,6 @@ var metaCacheTestset = []metacache{ ended: metaCacheTestsetTimestamp.Add(time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(time.Minute), lastHandout: metaCacheTestsetTimestamp, - startedCycle: 10, - endedCycle: 10, dataVersion: metacacheStreamVersion, }, 6: { @@ -133,8 +121,6 @@ var metaCacheTestset = []metacache{ ended: metaCacheTestsetTimestamp.Add(-8 * time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(-8 * time.Minute), lastHandout: metaCacheTestsetTimestamp, - startedCycle: 6, - endedCycle: 8, dataVersion: metacacheStreamVersion, }, 7: { @@ -149,8 +135,6 @@ var metaCacheTestset = []metacache{ ended: time.Time{}, lastUpdate: metaCacheTestsetTimestamp.Add(-1 * time.Minute), lastHandout: metaCacheTestsetTimestamp, - startedCycle: 10, - endedCycle: 0, dataVersion: metacacheStreamVersion, }, 8: { @@ -165,8 +149,6 @@ var metaCacheTestset = []metacache{ ended: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour), lastUpdate: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour), lastHandout: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour), - startedCycle: 10, - endedCycle: 10, dataVersion: metacacheStreamVersion, }, } @@ -222,45 +204,6 @@ func Test_baseDirFromPrefix(t *testing.T) { } } -func Test_metacache_canBeReplacedBy(t *testing.T) { - testAgainst := metacache{ - id: "case-1-modified", - bucket: "bucket", - root: "folder/prefix", - recursive: true, - status: scanStateSuccess, - fileNotFound: false, - error: "", - started: metaCacheTestsetTimestamp.Add(time.Minute), - ended: metaCacheTestsetTimestamp.Add(2 * time.Minute), - lastUpdate: metaCacheTestsetTimestamp.Add(2 * time.Minute), - lastHandout: metaCacheTestsetTimestamp.Add(time.Minute), - startedCycle: 10, - endedCycle: 10, - dataVersion: metacacheStreamVersion, - } - wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: true, 5: false, 6: true, 7: false, 8: false} - - for i, tt := range metaCacheTestset { - t.Run(tt.id, func(t *testing.T) { - var want bool - if i >= len(wantResults) { - t.Logf("no expected result for test #%d", i) - } else { - want = wantResults[i] - } - // Add an hour, otherwise it will never be replaced. - // We operated on a copy. - tt.lastHandout = tt.lastHandout.Add(-2 * time.Hour) - tt.lastUpdate = tt.lastHandout.Add(-2 * time.Hour) - got := tt.canBeReplacedBy(&testAgainst) - if got != want { - t.Errorf("#%d: want %v, got %v", i, want, got) - } - }) - } -} - func Test_metacache_finished(t *testing.T) { wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: true} @@ -282,7 +225,8 @@ func Test_metacache_finished(t *testing.T) { } func Test_metacache_worthKeeping(t *testing.T) { - wantResults := []bool{0: true, 1: true, 2: true, 3: false, 4: false, 5: true, 6: false, 7: false, 8: false} + // TODO: Update... + wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: false} for i, tt := range metaCacheTestset { t.Run(tt.id, func(t *testing.T) { @@ -293,7 +237,7 @@ func Test_metacache_worthKeeping(t *testing.T) { want = wantResults[i] } - got := tt.worthKeeping(7 + dataUsageUpdateDirCycles) + got := tt.worthKeeping() if got != want { t.Errorf("#%d: want %v, got %v", i, want, got) } diff --git a/cmd/notification.go b/cmd/notification.go index 49f0971f3..fbbc77a33 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -498,55 +498,6 @@ func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint6 return bf, nil } -// findEarliestCleanBloomFilter will find the earliest bloom filter across the cluster -// where the directory is clean. -// Due to how objects are stored this can include object names. -func (sys *NotificationSys) findEarliestCleanBloomFilter(ctx context.Context, dir string) uint64 { - - // Load initial state from local... - current := intDataUpdateTracker.current() - best := intDataUpdateTracker.latestWithDir(dir) - if best == current { - // If the current is dirty no need to check others. - return current - } - - var req = bloomFilterRequest{ - Current: 0, - Oldest: best, - OldestClean: dir, - } - - var mu sync.Mutex - g := errgroup.WithNErrs(len(sys.peerClients)) - for idx, client := range sys.peerClients { - if client == nil { - continue - } - client := client - g.Go(func() error { - serverBF, err := client.cycleServerBloomFilter(ctx, req) - - // Keep lock while checking result. - mu.Lock() - defer mu.Unlock() - - if err != nil { - // Error, don't assume clean. - best = current - logger.LogIf(ctx, err) - return nil - } - if serverBF.OldestIdx > best { - best = serverBF.OldestIdx - } - return nil - }, idx) - } - g.Wait() - return best -} - var errPeerNotReachable = errors.New("peer is not reachable") // GetLocks - makes GetLocks RPC call on all peers. diff --git a/internal/config/api/api.go b/internal/config/api/api.go index 6d529b1c3..d19e716c1 100644 --- a/internal/config/api/api.go +++ b/internal/config/api/api.go @@ -36,7 +36,6 @@ const ( apiCorsAllowOrigin = "cors_allow_origin" apiRemoteTransportDeadline = "remote_transport_deadline" apiListQuorum = "list_quorum" - apiExtendListCacheLife = "extend_list_cache_life" apiReplicationWorkers = "replication_workers" apiReplicationFailedWorkers = "replication_failed_workers" @@ -46,7 +45,6 @@ const ( EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" - EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE" EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS" EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS" @@ -85,10 +83,6 @@ var ( Key: apiListQuorum, Value: "optimal", }, - config.KV{ - Key: apiExtendListCacheLife, - Value: "0s", - }, config.KV{ Key: apiReplicationWorkers, Value: "250", @@ -108,7 +102,6 @@ type Config struct { CorsAllowOrigin []string `json:"cors_allow_origin"` RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` ListQuorum string `json:"list_quorum"` - ExtendListLife time.Duration `json:"extend_list_cache_life"` ReplicationWorkers int `json:"replication_workers"` ReplicationFailedWorkers int `json:"replication_failed_workers"` } @@ -144,6 +137,7 @@ func (sCfg Config) GetListQuorum() int { func LookupConfig(kvs config.KVS) (cfg Config, err error) { // remove this since we have removed this already. kvs.Delete(apiReadyDeadline) + kvs.Delete("extend_list_cache_life") if err = config.CheckValidKeys(config.APISubSys, kvs, DefaultKVS); err != nil { return cfg, err @@ -183,11 +177,6 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { return cfg, errors.New("invalid value for list strict quorum") } - listLife, err := time.ParseDuration(env.Get(EnvAPIExtendListCacheLife, kvs.Get(apiExtendListCacheLife))) - if err != nil { - return cfg, err - } - replicationWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationWorkers, kvs.Get(apiReplicationWorkers))) if err != nil { return cfg, err @@ -213,7 +202,6 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { CorsAllowOrigin: corsAllowOrigin, RemoteTransportDeadline: remoteTransportDeadline, ListQuorum: listQuorum, - ExtendListLife: listLife, ReplicationWorkers: replicationWorkers, ReplicationFailedWorkers: replicationFailedWorkers, }, nil