diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go
index 3e9e6649f..ea4d2c092 100644
--- a/cmd/data-scanner.go
+++ b/cmd/data-scanner.go
@@ -18,7 +18,6 @@
package cmd
import (
- "bytes"
"context"
"encoding/binary"
"encoding/json"
@@ -34,7 +33,6 @@ import (
"sync"
"time"
- "github.com/bits-and-blooms/bloom/v3"
"github.com/dustin/go-humanize"
"github.com/minio/madmin-go/v2"
"github.com/minio/minio/internal/bucket/lifecycle"
@@ -162,7 +160,6 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
// Load current bloom cycle
var cycleInfo currentScannerCycle
- cycleInfo.next = intDataUpdateTracker.current() + 1
buf, _ := readConfig(ctx, objAPI, dataUsageBloomNamePath)
if len(buf) == 8 {
@@ -207,9 +204,7 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
// Wait before starting next cycle and wait on startup.
results := make(chan DataUsageInfo, 1)
go storeDataUsageInBackend(ctx, objAPI, results)
- bf, err := globalNotificationSys.updateBloomFilter(ctx, cycleInfo.current)
- logger.LogIf(ctx, err)
- err = objAPI.NSScanner(ctx, bf, results, uint32(cycleInfo.current), scanMode)
+ err := objAPI.NSScanner(ctx, results, uint32(cycleInfo.current), scanMode)
logger.LogIf(ctx, err)
res := map[string]string{"cycle": fmt.Sprint(cycleInfo.current)}
if err != nil {
@@ -248,10 +243,8 @@ type folderScanner struct {
oldCache dataUsageCache
newCache dataUsageCache
updateCache dataUsageCache
- withFilter *bloomFilter
dataUsageScannerDebug bool
- healFolderInclude uint32 // Include a clean folder one in n cycles.
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
scanMode madmin.HealScanMode
@@ -310,8 +303,6 @@ type folderScanner struct {
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
// If the supplied context is canceled the function will return at the first chance.
func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) {
- logPrefix := color.Green("data-usage: ")
-
switch cache.Info.Name {
case "", dataUsageRoot:
return cache, errors.New("internal error: root scan attempted")
@@ -325,8 +316,7 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
oldCache: cache,
newCache: dataUsageCache{Info: cache.Info},
updateCache: dataUsageCache{Info: cache.Info},
- dataUsageScannerDebug: intDataUpdateTracker.debug,
- healFolderInclude: 0,
+ dataUsageScannerDebug: false,
healObjectSelect: 0,
scanMode: scanMode,
updates: cache.Info.updates,
@@ -349,19 +339,9 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
// Enable healing in XL mode.
if globalIsErasure && !cache.Info.SkipHealing {
- // Include a clean folder one in n cycles.
- s.healFolderInclude = healFolderIncludeProb
// Do a heal check on an object once every n cycles. Must divide into healFolderInclude
s.healObjectSelect = healObjectSelectProb
}
- if len(cache.Info.BloomFilter) > 0 {
- s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
- _, err := s.withFilter.ReadFrom(bytes.NewReader(cache.Info.BloomFilter))
- if err != nil {
- logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter")
- s.withFilter = nil
- }
- }
done := ctx.Done()
@@ -418,14 +398,12 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
return ctx.Err()
default:
}
- existing, ok := f.oldCache.Cache[thisHash.Key()]
var abandonedChildren dataUsageHashMap
if !into.Compacted {
abandonedChildren = f.oldCache.findChildrenCopy(thisHash)
}
// If there are lifecycle rules for the prefix, remove the filter.
- filter := f.withFilter
_, prefix := path2BucketObjectWithBasePath(f.root, folder.name)
var activeLifeCycle *lifecycle.Lifecycle
if f.oldCache.Info.lifeCycle != nil && f.oldCache.Info.lifeCycle.HasActiveRules(prefix) {
@@ -433,33 +411,13 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
console.Debugf(scannerLogPrefix+" Prefix %q has active rules\n", prefix)
}
activeLifeCycle = f.oldCache.Info.lifeCycle
- filter = nil
}
// If there are replication rules for the prefix, remove the filter.
var replicationCfg replicationConfig
if !f.oldCache.Info.replication.Empty() && f.oldCache.Info.replication.Config.HasActiveRules(prefix, true) {
replicationCfg = f.oldCache.Info.replication
- filter = nil
}
// Check if we can skip it due to bloom filter...
- if filter != nil && ok && existing.Compacted {
- // If folder isn't in filter and we have data, skip it completely.
- if folder.name != dataUsageRoot && !filter.containsDir(folder.name) {
- if f.healObjectSelect == 0 || !thisHash.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healFolderInclude/folder.objectHealProbDiv) {
- f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
- f.updateCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
- if f.dataUsageScannerDebug {
- console.Debugf(scannerLogPrefix+" Skipping non-updated folder: %v\n", folder.name)
- }
- return nil
- }
- if f.dataUsageScannerDebug {
- console.Debugf(scannerLogPrefix+" Adding non-updated folder to heal check: %v\n", folder.name)
- }
- // If probability was already scannerHealFolderInclude, keep it.
- folder.objectHealProbDiv = f.healFolderInclude
- }
- }
scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder)
var existingFolders, newFolders []cachedFolder
@@ -673,13 +631,10 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
// and the entry itself is compacted.
if !into.Compacted && f.oldCache.isCompacted(h) {
if !h.mod(f.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
- if f.healObjectSelect == 0 || !h.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healFolderInclude/folder.objectHealProbDiv) {
- // Transfer and add as child...
- f.newCache.copyWithChildren(&f.oldCache, h, folder.parent)
- into.addChild(h)
- continue
- }
- folder.objectHealProbDiv = f.healFolderInclude
+ // Transfer and add as child...
+ f.newCache.copyWithChildren(&f.oldCache, h, folder.parent)
+ into.addChild(h)
+ continue
}
}
f.updateCurrentPath(folder.name)
diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go
deleted file mode 100644
index dcc6edf74..000000000
--- a/cmd/data-update-tracker.go
+++ /dev/null
@@ -1,682 +0,0 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
-//
-// This file is part of MinIO Object Storage stack
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package cmd
-
-import (
- "bufio"
- "bytes"
- "context"
- "encoding/binary"
- "errors"
- "io"
- "os"
- "path"
- "sort"
- "strings"
- "sync"
- "time"
-
- "github.com/bits-and-blooms/bloom/v3"
- "github.com/minio/minio/internal/color"
- "github.com/minio/minio/internal/logger"
- "github.com/minio/pkg/console"
-)
-
-const (
- // Estimate bloom filter size. With this many items
- dataUpdateTrackerEstItems = 200000
- // ... we want this false positive rate:
- dataUpdateTrackerFP = 0.1
- dataUpdateTrackerQueueSize = 0
-
- dataUpdateTrackerFilename = dataUsageBucket + SlashSeparator + ".tracker.bin"
- dataUpdateTrackerVersion = 7
- dataUpdateTrackerSaveInterval = 5 * time.Minute
-)
-
-var intDataUpdateTracker *dataUpdateTracker
-
-func init() {
- intDataUpdateTracker = newDataUpdateTracker()
-}
-
-type dataUpdateTracker struct {
- mu sync.Mutex
- input chan string
- save chan struct{}
- debug bool
- saveExited chan struct{}
- dirty bool
-
- Current dataUpdateFilter
- History dataUpdateTrackerHistory
- Saved time.Time
-}
-
-// newDataUpdateTracker returns a dataUpdateTracker with default settings.
-func newDataUpdateTracker() *dataUpdateTracker {
- d := &dataUpdateTracker{
- Current: dataUpdateFilter{
- idx: 1,
- },
- debug: serverDebugLog,
- input: make(chan string, dataUpdateTrackerQueueSize),
- save: make(chan struct{}, 1),
- saveExited: make(chan struct{}),
- }
- d.Current.bf = d.newBloomFilter()
- d.dirty = true
- return d
-}
-
-type dataUpdateTrackerHistory []dataUpdateFilter
-
-type dataUpdateFilter struct {
- idx uint64
- bf bloomFilter
-}
-
-type bloomFilter struct {
- *bloom.BloomFilter
-}
-
-// emptyBloomFilter returns an empty bloom filter.
-func emptyBloomFilter() bloomFilter {
- return bloomFilter{BloomFilter: &bloom.BloomFilter{}}
-}
-
-// containsDir returns whether the bloom filter contains a directory.
-// Note that objects in XL mode are also considered directories.
-func (b bloomFilter) containsDir(in string) bool {
- split := splitPathDeterministic(path.Clean(in))
-
- if len(split) == 0 {
- return false
- }
- return b.TestString(hashPath(path.Join(split...)).String())
-}
-
-// bytes returns the bloom filter serialized as a byte slice.
-func (b *bloomFilter) bytes() []byte {
- if b == nil || b.BloomFilter == nil {
- return nil
- }
- var buf bytes.Buffer
- _, err := b.WriteTo(&buf)
- if err != nil {
- logger.LogIf(GlobalContext, err)
- return nil
- }
- return buf.Bytes()
-}
-
-// sort the dataUpdateTrackerHistory, newest first.
-// Returns whether the history is complete.
-func (d dataUpdateTrackerHistory) sort() bool {
- if len(d) == 0 {
- return true
- }
- sort.Slice(d, func(i, j int) bool {
- return d[i].idx > d[j].idx
- })
- return d[0].idx-d[len(d)-1].idx == uint64(len(d))
-}
-
-// removeOlderThan will remove entries older than index 'n'.
-func (d *dataUpdateTrackerHistory) removeOlderThan(n uint64) {
- d.sort()
- dd := *d
- end := len(dd)
- for i := end - 1; i >= 0; i-- {
- if dd[i].idx < n {
- end = i
- }
- }
- dd = dd[:end]
- *d = dd
-}
-
-// newBloomFilter returns a new bloom filter with default settings.
-func (d *dataUpdateTracker) newBloomFilter() bloomFilter {
- return bloomFilter{bloom.NewWithEstimates(dataUpdateTrackerEstItems, dataUpdateTrackerFP)}
-}
-
-// current returns the current index.
-func (d *dataUpdateTracker) current() uint64 {
- d.mu.Lock()
- defer d.mu.Unlock()
- return d.Current.idx
-}
-
-// latestWithDir returns the highest index that contains the directory.
-// This means that any cycle higher than this does NOT contain the entry.
-func (d *dataUpdateTracker) latestWithDir(dir string) uint64 {
- dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
- bucket, _ := path2BucketObjectWithBasePath("", dir)
- if bucket == "" {
- if d.debug && len(dir) > 0 {
- console.Debugf(dateUpdateTrackerLogPrefix+" no bucket (%s)\n", dir)
- }
- return d.current()
- }
- if isReservedOrInvalidBucket(bucket, false) {
- return d.current()
- }
-
- d.mu.Lock()
- defer d.mu.Unlock()
- if d.Current.bf.containsDir(dir) || d.Current.idx == 0 {
- return d.Current.idx
- }
- if d.debug {
- console.Debugf(dateUpdateTrackerLogPrefix+" current bloom does NOT contains dir %s\n", dir)
- }
-
- idx := d.Current.idx - 1
- for {
- f := d.History.find(idx)
- if f == nil || f.bf.containsDir(dir) || idx == 0 {
- break
- }
- idx--
- }
- return idx
-}
-
-// start will load the current data from the drives start collecting information and
-// start a saver goroutine.
-// All of these will exit when the context is canceled.
-func (d *dataUpdateTracker) start(ctx context.Context, drives ...string) {
- if len(drives) == 0 {
- logger.LogIf(ctx, errors.New("dataUpdateTracker.start: No local drives specified"))
- return
- }
- d.load(ctx, drives...)
- go d.startCollector(ctx)
- // startSaver will unlock.
- d.mu.Lock()
- go d.startSaver(ctx, dataUpdateTrackerSaveInterval, drives...)
-}
-
-// load will attempt to load data tracking information from the supplied drives.
-// The data will only be loaded if d.Saved is older than the one found on disk.
-// The newest working cache will be kept in d.
-// If no valid data usage tracker can be found d will remain unchanged.
-// If object is shared the caller should lock it.
-func (d *dataUpdateTracker) load(ctx context.Context, drives ...string) {
- if len(drives) == 0 {
- logger.LogIf(ctx, errors.New("dataUpdateTracker.load: No local drives specified"))
- return
- }
- for _, drive := range drives {
-
- cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename)
- f, err := OpenFile(cacheFormatPath, readMode, 0o666)
- if err != nil {
- if osIsNotExist(err) {
- continue
- }
- logger.LogIf(ctx, err)
- continue
- }
- err = d.deserialize(f, d.Saved)
- if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
- logger.LogIf(ctx, err)
- }
- f.Close()
- }
-}
-
-// startSaver will start a saver that will write d to all supplied drives at specific intervals.
-// 'd' must be write locked when started and will be unlocked.
-// The saver will save and exit when supplied context is closed.
-func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives ...string) {
- if len(drives) == 0 {
- return
- }
-
- saveNow := d.save
- exited := make(chan struct{})
- d.saveExited = exited
- d.mu.Unlock()
- t := time.NewTicker(interval)
- defer t.Stop()
- defer close(exited)
- var buf bytes.Buffer
- for {
- var exit bool
- select {
- case <-ctx.Done():
- exit = true
- case <-t.C:
- case <-saveNow:
- }
- buf.Reset()
- d.mu.Lock()
- if !d.dirty {
- d.mu.Unlock()
- if exit {
- return
- }
- continue
- }
- d.Saved = UTCNow()
- err := d.serialize(&buf)
- if d.debug {
- console.Debugf(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v\n", buf.Len(), d.Current.idx)
- }
- d.dirty = false
- d.mu.Unlock()
- if err != nil {
- logger.LogIf(ctx, err, "Error serializing usage tracker data")
- if exit {
- return
- }
- continue
- }
- if buf.Len() == 0 {
- logger.LogIf(ctx, errors.New("zero sized output, skipping save"))
- continue
- }
- for _, drive := range drives {
- cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename)
- err := os.WriteFile(cacheFormatPath, buf.Bytes(), os.ModePerm)
- if err != nil {
- if osIsNotExist(err) {
- continue
- }
- logger.LogIf(ctx, err)
- continue
- }
- }
- if exit {
- return
- }
- }
-}
-
-// serialize all data in d to dst.
-// Caller should hold lock if d is expected to be shared.
-// If an error is returned, there will likely be partial data written to dst.
-func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) {
- ctx := GlobalContext
- var tmp [8]byte
- o := bufio.NewWriter(dst)
- defer func() {
- if err == nil {
- err = o.Flush()
- }
- }()
-
- // Version
- if err := o.WriteByte(dataUpdateTrackerVersion); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
- // Timestamp.
- binary.LittleEndian.PutUint64(tmp[:], uint64(d.Saved.Unix()))
- if _, err := o.Write(tmp[:]); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
-
- // Current
- binary.LittleEndian.PutUint64(tmp[:], d.Current.idx)
- if _, err := o.Write(tmp[:]); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
-
- if _, err := d.Current.bf.WriteTo(o); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
-
- // History
- binary.LittleEndian.PutUint64(tmp[:], uint64(len(d.History)))
- if _, err := o.Write(tmp[:]); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
-
- for _, bf := range d.History {
- // Current
- binary.LittleEndian.PutUint64(tmp[:], bf.idx)
- if _, err := o.Write(tmp[:]); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
-
- if _, err := bf.bf.WriteTo(o); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
- }
- return nil
-}
-
-// deserialize will deserialize the supplied input if the input is newer than the supplied time.
-func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) error {
- ctx := GlobalContext
- var dst dataUpdateTracker
- var tmp [8]byte
-
- // Version
- if _, err := io.ReadFull(src, tmp[:1]); err != nil {
- if d.debug {
- if err != io.EOF {
- logger.LogIf(ctx, err)
- }
- }
- return err
- }
- switch tmp[0] {
- case 1, 2, 3, 4, 5, 6:
- if intDataUpdateTracker.debug {
- console.Debugln(color.Green("dataUpdateTracker: ") + "deprecated data version, updating.")
- }
- return nil
- case dataUpdateTrackerVersion:
- default:
- return errors.New("dataUpdateTracker: Unknown data version")
- }
- // Timestamp.
- if _, err := io.ReadFull(src, tmp[:8]); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
- t := time.Unix(int64(binary.LittleEndian.Uint64(tmp[:])), 0)
- if !t.After(newerThan) {
- return nil
- }
-
- // Current
- if _, err := io.ReadFull(src, tmp[:8]); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
- dst.Current.idx = binary.LittleEndian.Uint64(tmp[:])
- dst.Current.bf = emptyBloomFilter()
- if _, err := dst.Current.bf.ReadFrom(src); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
-
- // History
- if _, err := io.ReadFull(src, tmp[:8]); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
- n := binary.LittleEndian.Uint64(tmp[:])
- dst.History = make(dataUpdateTrackerHistory, int(n))
- for i, e := range dst.History {
- if _, err := io.ReadFull(src, tmp[:8]); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
- e.idx = binary.LittleEndian.Uint64(tmp[:])
- e.bf = emptyBloomFilter()
- if _, err := e.bf.ReadFrom(src); err != nil {
- if d.debug {
- logger.LogIf(ctx, err)
- }
- return err
- }
- dst.History[i] = e
- }
- // Ignore what remains on the stream.
- // Update d:
- d.mu.Lock()
- defer d.mu.Unlock()
- d.Current = dst.Current
- d.History = dst.History
- d.Saved = dst.Saved
- return nil
-}
-
-// start a collector that picks up entries from objectUpdatedCh
-// and adds them to the current bloom filter.
-func (d *dataUpdateTracker) startCollector(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- return
- case in := <-d.input:
- bucket, _ := path2BucketObjectWithBasePath("", in)
- if bucket == "" {
- if d.debug && len(in) > 0 {
- console.Debugf(color.Green("dataUpdateTracker:")+" no bucket (%s)\n", in)
- }
- continue
- }
-
- if isReservedOrInvalidBucket(bucket, false) {
- continue
- }
- split := splitPathDeterministic(in)
-
- // Add all paths until done.
- d.mu.Lock()
- for i := range split {
- d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
- }
- d.dirty = d.dirty || len(split) > 0
- d.mu.Unlock()
- }
- }
-}
-
-// markDirty adds the supplied path to the current bloom filter.
-func (d *dataUpdateTracker) markDirty(bucket, prefix string) {
- dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
- if bucket == "" && d.debug {
- console.Debugf(dateUpdateTrackerLogPrefix + " no bucket specified\n")
- return
- }
-
- if isReservedOrInvalidBucket(bucket, false) && d.debug {
- return
- }
- split := splitPathDeterministic(pathJoin(bucket, prefix))
-
- // Add all paths until done.
- d.mu.Lock()
- for i := range split {
- d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
- }
- d.dirty = d.dirty || len(split) > 0
- d.mu.Unlock()
-}
-
-// find entry with specified index.
-// Returns nil if not found.
-func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter {
- for _, f := range d {
- if f.idx == idx {
- return &f
- }
- }
- return nil
-}
-
-// filterFrom will return a combined bloom filter.
-func (d *dataUpdateTracker) filterFrom(ctx context.Context, oldest, newest uint64) *bloomFilterResponse {
- bf := d.newBloomFilter()
- bfr := bloomFilterResponse{
- OldestIdx: oldest,
- CurrentIdx: d.Current.idx,
- Complete: true,
- }
- // Loop through each index requested.
- for idx := oldest; idx <= newest; idx++ {
- v := d.History.find(idx)
- if v == nil {
- if d.Current.idx == idx {
- // Merge current.
- err := bf.Merge(d.Current.bf.BloomFilter)
- logger.LogIf(ctx, err)
- if err != nil {
- bfr.Complete = false
- }
- continue
- }
- bfr.Complete = false
- bfr.OldestIdx = idx + 1
- continue
- }
-
- err := bf.Merge(v.bf.BloomFilter)
- if err != nil {
- bfr.Complete = false
- logger.LogIf(ctx, err)
- continue
- }
- bfr.NewestIdx = idx
- }
- var dst bytes.Buffer
- _, err := bf.WriteTo(&dst)
- if err != nil {
- logger.LogIf(ctx, err)
- return nil
- }
- bfr.Filter = dst.Bytes()
-
- return &bfr
-}
-
-// cycleFilter will cycle the bloom filter to start recording to index y if not already.
-// The response will contain a bloom filter starting at index x up to, but not including index y.
-// If y is 0, the response will not update y, but return the currently recorded information
-// from the oldest (unless 0, then it will be all) until and including current y.
-func (d *dataUpdateTracker) cycleFilter(ctx context.Context, req bloomFilterRequest) (*bloomFilterResponse, error) {
- if req.OldestClean != "" {
- return &bloomFilterResponse{OldestIdx: d.latestWithDir(req.OldestClean)}, nil
- }
- current := req.Current
- oldest := req.Oldest
- d.mu.Lock()
- defer d.mu.Unlock()
- if current == 0 {
- if len(d.History) == 0 {
- return d.filterFrom(ctx, d.Current.idx, d.Current.idx), nil
- }
- d.History.sort()
- if oldest == 0 {
- oldest = d.History[len(d.History)-1].idx
- }
- return d.filterFrom(ctx, oldest, d.Current.idx), nil
- }
-
- // Move current to history if new one requested
- if d.Current.idx != current {
- d.dirty = true
- if d.debug {
- console.Debugf(color.Green("dataUpdateTracker:")+" cycle bloom filter: %v -> %v\n", d.Current.idx, current)
- }
-
- d.History = append(d.History, d.Current)
- d.Current.idx = current
- d.Current.bf = d.newBloomFilter()
- select {
- case d.save <- struct{}{}:
- default:
- }
- }
- d.History.removeOlderThan(oldest)
- return d.filterFrom(ctx, oldest, current), nil
-}
-
-// splitPathDeterministic will split the provided relative path
-// deterministically and return up to the first 3 elements of the path.
-// slash and dot prefixes are removed.
-// Trailing slashes are removed.
-// Returns 0 length if no parts are found after trimming.
-func splitPathDeterministic(in string) []string {
- split := strings.Split(decodeDirObject(in), SlashSeparator)
-
- // Trim empty start/end
- for len(split) > 0 {
- if len(split[0]) > 0 && split[0] != "." {
- break
- }
- split = split[1:]
- }
- for len(split) > 0 {
- if len(split[len(split)-1]) > 0 {
- break
- }
- split = split[:len(split)-1]
- }
-
- return split
-}
-
-// bloomFilterRequest request bloom filters.
-// Current index will be updated to current and entries back to Oldest is returned.
-type bloomFilterRequest struct {
- Oldest uint64
- Current uint64
- // If set the oldest clean version will be returned in OldestIdx
- // and the rest of the request will be ignored.
- OldestClean string
-}
-
-type bloomFilterResponse struct {
- // Current index being written to.
- CurrentIdx uint64
- // Oldest index in the returned bloom filter.
- OldestIdx uint64
- // Newest Index in the returned bloom filter.
- NewestIdx uint64
- // Are all indexes between oldest and newest filled?
- Complete bool
- // Binary data of the bloom filter.
- Filter []byte
-}
-
-// NSUpdated indicates namespace has been updated.
-// The function will block until the entry has been picked up.
-func NSUpdated(bucket, prefix string) {
- if intDataUpdateTracker != nil {
- intDataUpdateTracker.markDirty(bucket, prefix)
- }
-}
diff --git a/cmd/data-update-tracker_test.go b/cmd/data-update-tracker_test.go
deleted file mode 100644
index 19b819ab0..000000000
--- a/cmd/data-update-tracker_test.go
+++ /dev/null
@@ -1,299 +0,0 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
-//
-// This file is part of MinIO Object Storage stack
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package cmd
-
-import (
- "bytes"
- "context"
- "fmt"
- "math/rand"
- "os"
- "path"
- "path/filepath"
- "sync"
- "testing"
-
- "github.com/minio/minio/internal/logger"
- "github.com/minio/minio/internal/logger/target/types"
- "github.com/minio/pkg/logger/message/log"
-)
-
-type testLoggerI interface {
- Helper()
- Log(args ...interface{})
-}
-
-type testingLogger struct {
- mu sync.Mutex
- t testLoggerI
-}
-
-func (t *testingLogger) Endpoint() string {
- return ""
-}
-
-func (t *testingLogger) String() string {
- return ""
-}
-
-func (t *testingLogger) Init() error {
- return nil
-}
-
-func (t *testingLogger) Cancel() {
-}
-
-func (t *testingLogger) Type() types.TargetType {
- return types.TargetHTTP
-}
-
-func (t *testingLogger) IsOnline() bool {
- return true
-}
-
-// Stats returns the target statistics.
-func (t *testingLogger) Stats() types.TargetStats {
- return types.TargetStats{}
-}
-
-func (t *testingLogger) Send(entry interface{}) error {
- t.mu.Lock()
- defer t.mu.Unlock()
- if t.t == nil {
- return nil
- }
- e, ok := entry.(log.Entry)
- if !ok {
- return fmt.Errorf("unexpected log entry structure %#v", entry)
- }
-
- t.t.Helper()
- t.t.Log(e.Level, ":", e.Message)
- return nil
-}
-
-func addTestingLogging(t testLoggerI) func() {
- tl := &testingLogger{t: t}
- logger.AddSystemTarget(tl)
- return func() {
- tl.mu.Lock()
- defer tl.mu.Unlock()
- tl.t = nil
- }
-}
-
-func TestDataUpdateTracker(t *testing.T) {
- dut := newDataUpdateTracker()
- // Change some defaults.
- dut.debug = testing.Verbose()
- dut.input = make(chan string)
- dut.save = make(chan struct{})
-
- defer addTestingLogging(t)()
-
- dut.Current.bf = dut.newBloomFilter()
-
- tmpDir := t.TempDir()
- err := os.MkdirAll(filepath.Dir(filepath.Join(tmpDir, dataUpdateTrackerFilename)), os.ModePerm)
- if err != nil {
- t.Fatal(err)
- }
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- dut.start(ctx, tmpDir)
-
- tests := []struct {
- in string
- check []string // if not empty, check against these instead.
- exist bool
- }{
- {
- in: "bucket/directory/file.txt",
- check: []string{"bucket", "bucket/", "/bucket", "bucket/directory", "bucket/directory/", "bucket/directory/file.txt", "/bucket/directory/file.txt"},
- exist: true,
- },
- {
- // System bucket
- in: ".minio.sys/ignoreme/pls",
- exist: false,
- },
- {
- // Not a valid bucket
- in: "./bucket/okfile.txt",
- check: []string{"./bucket/okfile.txt", "/bucket/okfile.txt", "bucket/okfile.txt"},
- exist: false,
- },
- {
- // Not a valid bucket
- in: "æ/okfile.txt",
- check: []string{"æ/okfile.txt", "æ/okfile.txt", "æ"},
- exist: false,
- },
- {
- in: "/bucket2/okfile2.txt",
- check: []string{"./bucket2/okfile2.txt", "/bucket2/okfile2.txt", "bucket2/okfile2.txt", "bucket2"},
- exist: true,
- },
- {
- in: "/bucket3/prefix/okfile2.txt",
- check: []string{"./bucket3/prefix/okfile2.txt", "/bucket3/prefix/okfile2.txt", "bucket3/prefix/okfile2.txt", "bucket3/prefix", "bucket3"},
- exist: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.in, func(t *testing.T) {
- dut.input <- tt.in
- dut.input <- "" // Sending empty string ensures the previous is added to filter.
- dut.mu.Lock()
- defer dut.mu.Unlock()
- if len(tt.check) == 0 {
- got := dut.Current.bf.containsDir(tt.in)
- if got != tt.exist {
- // For unlimited tests this could lead to false positives,
- // but it should be deterministic.
- t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist)
- }
- return
- }
- for _, check := range tt.check {
- got := dut.Current.bf.containsDir(check)
- if got != tt.exist {
- // For unlimited tests this could lead to false positives,
- // but it should be deterministic.
- t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist)
- }
- continue
- }
- })
- }
- // Cycle to history
- req := bloomFilterRequest{
- Oldest: 1,
- Current: 2,
- }
-
- _, err = dut.cycleFilter(ctx, req)
- if err != nil {
- t.Fatal(err)
- }
- dut.input <- "cycle2/file.txt"
- dut.input <- "" // Sending empty string ensures the previous is added to filter.
-
- tests = append(tests, struct {
- in string
- check []string
- exist bool
- }{in: "cycle2/file.txt", exist: true})
-
- // Shut down
- cancel()
- <-dut.saveExited
-
- if dut.current() != 2 {
- t.Fatal("wrong current idx after save. want 2, got:", dut.current())
- }
-
- ctx, cancel = context.WithCancel(context.Background())
- // Reload...
- dut = newDataUpdateTracker()
- dut.start(ctx, tmpDir)
- defer func() {
- cancel()
- <-dut.saveExited
- }()
-
- if dut.current() != 2 {
- t.Fatal("current idx after load not preserved. want 2, got:", dut.current())
- }
- req = bloomFilterRequest{
- Oldest: 1,
- Current: 3,
- }
- bfr2, err := dut.cycleFilter(ctx, req)
- if err != nil {
- t.Fatal(err)
- }
- if !bfr2.Complete {
- t.Fatal("Wanted complete, didn't get it")
- }
- if bfr2.CurrentIdx != 3 {
- t.Fatal("wanted index 3, got", bfr2.CurrentIdx)
- }
- if bfr2.OldestIdx != 1 {
- t.Fatal("wanted oldest index 3, got", bfr2.OldestIdx)
- }
-
- t.Logf("Size of filter %d bytes, M: %d, K:%d", len(bfr2.Filter), dut.Current.bf.Cap(), dut.Current.bf.K())
- // Rerun test with returned bfr2
- bf := dut.newBloomFilter()
- _, err = bf.ReadFrom(bytes.NewReader(bfr2.Filter))
- if err != nil {
- t.Fatal(err)
- }
- for _, tt := range tests {
- t.Run(tt.in+"-reloaded", func(t *testing.T) {
- if len(tt.check) == 0 {
- got := bf.containsDir(tt.in)
- if got != tt.exist {
- // For unlimited tests this could lead to false positives,
- // but it should be deterministic.
- t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist)
- }
- return
- }
- for _, check := range tt.check {
- got := bf.containsDir(check)
- if got != tt.exist {
- // For unlimited tests this could lead to false positives,
- // but it should be deterministic.
- t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist)
- }
- continue
- }
- })
- }
-}
-
-func BenchmarkDataUpdateTracker(b *testing.B) {
- dut := newDataUpdateTracker()
- // Change some defaults.
- dut.debug = false
- dut.input = make(chan string)
- dut.save = make(chan struct{})
-
- defer addTestingLogging(b)()
-
- dut.Current.bf = dut.newBloomFilter()
- // We do this unbuffered. This will very significantly reduce throughput, so this is a worst case.
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go dut.startCollector(ctx)
- input := make([]string, 1000)
- rng := rand.New(rand.NewSource(0xabad1dea))
- tmp := []string{"bucket", "aprefix", "nextprefixlevel", "maybeobjname", "evendeeper", "ok-one-morelevel", "final.object"}
- for i := range input {
- tmp := tmp[:1+rng.Intn(cap(tmp)-1)]
- input[i] = path.Join(tmp...)
- }
- b.SetBytes(1)
- b.ResetTimer()
- b.ReportAllocs()
- for i := 0; i < b.N; i++ {
- dut.input <- input[rng.Intn(len(input))]
- }
-}
diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go
index 271b6a86f..d8b572978 100644
--- a/cmd/data-usage-cache.go
+++ b/cmd/data-usage-cache.go
@@ -276,7 +276,6 @@ type dataUsageCacheInfo struct {
// indicates if the disk is being healed and scanner
// should skip healing the disk
SkipHealing bool
- BloomFilter []byte `msg:"BloomFilter,omitempty"`
// Active lifecycle, if any on the bucket
lifeCycle *lifecycle.Lifecycle `msg:"-"`
@@ -649,10 +648,7 @@ func (d *dataUsageCache) reduceChildrenOf(path dataUsageHash, limit int, compact
// StringAll returns a detailed string representation of all entries in the cache.
func (d *dataUsageCache) StringAll() string {
// Remove bloom filter from print.
- bf := d.Info.BloomFilter
- d.Info.BloomFilter = nil
s := fmt.Sprintf("info:%+v\n", d.Info)
- d.Info.BloomFilter = bf
for k, v := range d.Cache {
s += fmt.Sprintf("\t%v: %+v\n", k, v)
}
diff --git a/cmd/data-usage-cache_gen.go b/cmd/data-usage-cache_gen.go
index 2c3e250c8..d124787b8 100644
--- a/cmd/data-usage-cache_gen.go
+++ b/cmd/data-usage-cache_gen.go
@@ -629,12 +629,6 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "SkipHealing")
return
}
- case "BloomFilter":
- z.BloomFilter, err = dc.ReadBytes(z.BloomFilter)
- if err != nil {
- err = msgp.WrapError(err, "BloomFilter")
- return
- }
default:
err = dc.Skip()
if err != nil {
@@ -648,24 +642,9 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) {
- // omitempty: check for empty values
- zb0001Len := uint32(5)
- var zb0001Mask uint8 /* 5 bits */
- _ = zb0001Mask
- if z.BloomFilter == nil {
- zb0001Len--
- zb0001Mask |= 0x10
- }
- // variable map header, size zb0001Len
- err = en.Append(0x80 | uint8(zb0001Len))
- if err != nil {
- return
- }
- if zb0001Len == 0 {
- return
- }
+ // map header, size 4
// write "Name"
- err = en.Append(0xa4, 0x4e, 0x61, 0x6d, 0x65)
+ err = en.Append(0x84, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
if err != nil {
return
}
@@ -704,39 +683,15 @@ func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "SkipHealing")
return
}
- if (zb0001Mask & 0x10) == 0 { // if not empty
- // write "BloomFilter"
- err = en.Append(0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72)
- if err != nil {
- return
- }
- err = en.WriteBytes(z.BloomFilter)
- if err != nil {
- err = msgp.WrapError(err, "BloomFilter")
- return
- }
- }
return
}
// MarshalMsg implements msgp.Marshaler
func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
- // omitempty: check for empty values
- zb0001Len := uint32(5)
- var zb0001Mask uint8 /* 5 bits */
- _ = zb0001Mask
- if z.BloomFilter == nil {
- zb0001Len--
- zb0001Mask |= 0x10
- }
- // variable map header, size zb0001Len
- o = append(o, 0x80|uint8(zb0001Len))
- if zb0001Len == 0 {
- return
- }
+ // map header, size 4
// string "Name"
- o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
+ o = append(o, 0x84, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = msgp.AppendString(o, z.Name)
// string "NextCycle"
o = append(o, 0xa9, 0x4e, 0x65, 0x78, 0x74, 0x43, 0x79, 0x63, 0x6c, 0x65)
@@ -747,11 +702,6 @@ func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) {
// string "SkipHealing"
o = append(o, 0xab, 0x53, 0x6b, 0x69, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67)
o = msgp.AppendBool(o, z.SkipHealing)
- if (zb0001Mask & 0x10) == 0 { // if not empty
- // string "BloomFilter"
- o = append(o, 0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72)
- o = msgp.AppendBytes(o, z.BloomFilter)
- }
return
}
@@ -797,12 +747,6 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "SkipHealing")
return
}
- case "BloomFilter":
- z.BloomFilter, bts, err = msgp.ReadBytesBytes(bts, z.BloomFilter)
- if err != nil {
- err = msgp.WrapError(err, "BloomFilter")
- return
- }
default:
bts, err = msgp.Skip(bts)
if err != nil {
@@ -817,7 +761,7 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *dataUsageCacheInfo) Msgsize() (s int) {
- s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 10 + msgp.Uint32Size + 11 + msgp.TimeSize + 12 + msgp.BoolSize + 12 + msgp.BytesPrefixSize + len(z.BloomFilter)
+ s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 10 + msgp.Uint32Size + 11 + msgp.TimeSize + 12 + msgp.BoolSize
return
}
diff --git a/cmd/erasure-encode_test.go b/cmd/erasure-encode_test.go
index 925c6c434..e30d7e970 100644
--- a/cmd/erasure-encode_test.go
+++ b/cmd/erasure-encode_test.go
@@ -24,7 +24,7 @@ import (
"io"
"testing"
- humanize "github.com/dustin/go-humanize"
+ "github.com/dustin/go-humanize"
)
type badDisk struct{ StorageAPI }
@@ -41,10 +41,6 @@ func (a badDisk) ReadFileStream(ctx context.Context, volume, path string, offset
return nil, errFaultyDisk
}
-func (a badDisk) UpdateBloomFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) {
- return nil, errFaultyDisk
-}
-
func (a badDisk) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error {
return errFaultyDisk
}
diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go
index daba0dba6..dea5f044a 100644
--- a/cmd/erasure-healing.go
+++ b/cmd/erasure-healing.go
@@ -72,10 +72,6 @@ func (fi FileInfo) DataShardFixed() bool {
func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (
result madmin.HealResultItem, err error,
) {
- if !opts.DryRun {
- defer NSUpdated(bucket, slashSeparator)
- }
-
storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints()
@@ -754,7 +750,6 @@ func (er *erasureObjects) healObjectDir(ctx context.Context, bucket, object stri
}(index, disk)
}
wg.Wait()
- NSUpdated(bucket, object)
}
}
diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go
index 9e8aaa000..266a9d704 100644
--- a/cmd/erasure-multipart.go
+++ b/cmd/erasure-multipart.go
@@ -1216,7 +1216,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
- defer NSUpdated(bucket, object)
if !opts.Speedtest && versionsDisparity {
listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity)
diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go
index ba7588112..7fd07bf2b 100644
--- a/cmd/erasure-object.go
+++ b/cmd/erasure-object.go
@@ -72,8 +72,6 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
return oi, NotImplemented{}
}
- defer NSUpdated(dstBucket, dstObject)
-
if !dstOpts.NoLock {
lk := er.NewNSLock(dstBucket, dstObject)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
@@ -483,7 +481,6 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
if opts.VersionID != "" {
err = errFileVersionNotFound
}
- defer NSUpdated(bucket, object)
fi := FileInfo{
VersionID: m.VersionID,
@@ -1271,8 +1268,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
- defer NSUpdated(bucket, object)
-
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
// Object info is the same in all disks, so we can pick
@@ -1475,8 +1470,6 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
} else {
errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName)
}
-
- defer NSUpdated(bucket, objects[objIndex].ObjectName)
}
// Check failed deletes across multiple objects
@@ -1568,9 +1561,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
}
}
- objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
- defer NSUpdated(bucket, object)
-
storageDisks := er.getDisks()
versionFound := true
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
@@ -1940,7 +1930,6 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
if fi.TransitionStatus == lifecycle.TransitionComplete {
return nil
}
- defer NSUpdated(bucket, object)
if fi.XLV1 {
if _, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{NoLock: true}); err != nil {
diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go
index 5e8d70895..346ce8840 100644
--- a/cmd/erasure-server-pool.go
+++ b/cmd/erasure-server-pool.go
@@ -56,9 +56,6 @@ type erasureServerPools struct {
serverPools []*erasureSets
- // Shut down async operations
- shutdown context.CancelFunc
-
// Active decommission canceler
decommissionCancelers []context.CancelFunc
@@ -160,14 +157,7 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
break
}
- drives := make([]string, 0, len(localDrives))
- for _, localDrive := range localDrives {
- drives = append(drives, localDrive.Endpoint().Path)
- }
-
globalLocalDrives = localDrives
- ctx, z.shutdown = context.WithCancel(ctx)
- go intDataUpdateTracker.start(ctx, drives...)
return z, nil
}
@@ -524,8 +514,6 @@ func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object stri
}
func (z *erasureServerPools) Shutdown(ctx context.Context) error {
- defer z.shutdown()
-
g := errgroup.WithNErrs(len(z.serverPools))
for index := range z.serverPools {
@@ -594,7 +582,7 @@ func (z *erasureServerPools) StorageInfo(ctx context.Context) StorageInfo {
return globalNotificationSys.StorageInfo(z)
}
-func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error {
+func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error {
// Updates must be closed before we return.
defer close(updates)
@@ -639,7 +627,7 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd
}
}()
// Start scanner. Blocks until done.
- err := erObj.nsScanner(ctx, allBuckets, bf, wantCycle, updates, healScanMode)
+ err := erObj.nsScanner(ctx, allBuckets, wantCycle, updates, healScanMode)
if err != nil {
logger.LogIf(ctx, err)
mu.Lock()
@@ -712,8 +700,6 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd
// even if one of the sets fail to create buckets, we proceed all the successful
// operations.
func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
- defer NSUpdated(bucket, slashSeparator)
-
// Verify if bucket is valid.
if !isMinioMetaBucketName(bucket) {
if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil {
@@ -1637,7 +1623,6 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, op
return BucketNameInvalid{Bucket: bucket}
}
- defer NSUpdated(bucket, slashSeparator)
if !opts.NoLock {
// Lock the bucket name before creating.
lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck")
diff --git a/cmd/erasure.go b/cmd/erasure.go
index df5d10b19..7cf97a785 100644
--- a/cmd/erasure.go
+++ b/cmd/erasure.go
@@ -33,7 +33,6 @@ import (
"github.com/minio/minio/internal/dsync"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
- "github.com/minio/pkg/console"
)
// list all errors that can be ignore in a bucket operation.
@@ -351,7 +350,7 @@ func (er erasureObjects) cleanupDeletedObjects(ctx context.Context) {
// nsScanner will start scanning buckets and send updated totals as they are traversed.
// Updates are sent on a regular basis and the caller *must* consume them.
-func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, wantCycle uint32, updates chan<- dataUsageCache, healScanMode madmin.HealScanMode) error {
+func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wantCycle uint32, updates chan<- dataUsageCache, healScanMode madmin.HealScanMode) error {
if len(buckets) == 0 {
return nil
}
@@ -377,7 +376,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
},
Cache: make(map[string]dataUsageEntry, len(oldCache.Cache)),
}
- bloom := bf.bytes()
// Put all buckets into channel.
bucketCh := make(chan BucketInfo, len(buckets))
@@ -473,7 +471,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
if cache.Info.Name == "" {
cache.Info.Name = bucket.Name
}
- cache.Info.BloomFilter = bloom
cache.Info.SkipHealing = healing
cache.Info.NextCycle = wantCycle
if cache.Info.Name != bucket.Name {
@@ -496,16 +493,12 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
Parent: dataUsageRoot,
Entry: update,
}
- if intDataUpdateTracker.debug {
- console.Debugln("z:", er.poolIndex, "s:", er.setIndex, "bucket", name, "got update", update)
- }
}
}(cache.Info.Name)
// Calc usage
before := cache.Info.LastUpdate
var err error
cache, err = disk.NSScanner(ctx, cache, updates, healScanMode)
- cache.Info.BloomFilter = nil
if err != nil {
if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) {
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
diff --git a/cmd/notification.go b/cmd/notification.go
index f3dd03e00..22889727e 100644
--- a/cmd/notification.go
+++ b/cmd/notification.go
@@ -18,9 +18,7 @@
package cmd
import (
- "bytes"
"context"
- "encoding/json"
"errors"
"fmt"
"io"
@@ -29,7 +27,6 @@ import (
"sync"
"time"
- "github.com/bits-and-blooms/bloom/v3"
"github.com/cespare/xxhash/v2"
"github.com/klauspost/compress/zip"
"github.com/minio/madmin-go/v2"
@@ -384,75 +381,6 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE
return ng.Wait()
}
-// updateBloomFilter will cycle all servers to the current index and
-// return a merged bloom filter if a complete one can be retrieved.
-func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint64) (*bloomFilter, error) {
- req := bloomFilterRequest{
- Current: current,
- Oldest: current - dataUsageUpdateDirCycles,
- }
- if current < dataUsageUpdateDirCycles {
- req.Oldest = 0
- }
-
- // Load initial state from local...
- var bf *bloomFilter
- bfr, err := intDataUpdateTracker.cycleFilter(ctx, req)
- logger.LogIf(ctx, err)
- if err == nil && bfr.Complete {
- nbf := intDataUpdateTracker.newBloomFilter()
- bf = &nbf
- _, err = bf.ReadFrom(bytes.NewReader(bfr.Filter))
- logger.LogIf(ctx, err)
- }
-
- var mu sync.Mutex
- g := errgroup.WithNErrs(len(sys.peerClients))
- for idx, client := range sys.peerClients {
- if client == nil {
- continue
- }
- client := client
- g.Go(func() error {
- serverBF, err := client.cycleServerBloomFilter(ctx, req)
- if false && intDataUpdateTracker.debug {
- b, _ := json.MarshalIndent(serverBF, "", " ")
- logger.Info("Drive %v, Bloom filter: %v", client.host.Name, string(b))
- }
- // Keep lock while checking result.
- mu.Lock()
- defer mu.Unlock()
-
- if err != nil || !serverBF.Complete || bf == nil {
- logger.LogOnceIf(ctx, err, client.host.String(), client.cycleServerBloomFilter)
- bf = nil
- return nil
- }
-
- var tmp bloom.BloomFilter
- _, err = tmp.ReadFrom(bytes.NewReader(serverBF.Filter))
- if err != nil {
- logger.LogIf(ctx, err)
- bf = nil
- return nil
- }
- if bf.BloomFilter == nil {
- bf.BloomFilter = &tmp
- } else {
- err = bf.Merge(&tmp)
- if err != nil {
- logger.LogIf(ctx, err)
- bf = nil
- return nil
- }
- }
- return nil
- }, idx)
- }
- g.Wait()
- return bf, nil
-}
-
var errPeerNotReachable = errors.New("peer is not reachable")
// GetLocks - makes GetLocks RPC call on all peers.
diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go
index f0f47a084..910aea211 100644
--- a/cmd/object-api-interface.go
+++ b/cmd/object-api-interface.go
@@ -195,7 +195,7 @@ type ObjectLayer interface {
// Storage operations.
Shutdown(context.Context) error
- NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, scanMode madmin.HealScanMode) error
+ NSScanner(ctx context.Context, updates chan<- DataUsageInfo, wantCycle uint32, scanMode madmin.HealScanMode) error
BackendInfo() madmin.BackendInfo
StorageInfo(ctx context.Context) StorageInfo
LocalStorageInfo(ctx context.Context) StorageInfo
diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go
index be58e91b0..32eb04492 100644
--- a/cmd/peer-rest-client.go
+++ b/cmd/peer-rest-client.go
@@ -309,25 +309,6 @@ func (client *peerRESTClient) DeleteBucketMetadata(bucket string) error {
return nil
}
-// cycleServerBloomFilter will cycle the bloom filter to start recording to index y if not already.
-// The response will contain a bloom filter starting at index x up to, but not including index y.
-// If y is 0, the response will not update y, but return the currently recorded information
-// from the current x to y-1.
-func (client *peerRESTClient) cycleServerBloomFilter(ctx context.Context, req bloomFilterRequest) (*bloomFilterResponse, error) {
- var reader bytes.Buffer
- err := gob.NewEncoder(&reader).Encode(req)
- if err != nil {
- return nil, err
- }
- respBody, err := client.callWithContext(ctx, peerRESTMethodCycleBloom, nil, &reader, -1)
- if err != nil {
- return nil, err
- }
- var resp bloomFilterResponse
- defer xhttp.DrainBody(respBody)
- return &resp, gob.NewDecoder(respBody).Decode(&resp)
-}
-
// DeletePolicy - delete a specific canned policy.
func (client *peerRESTClient) DeletePolicy(policyName string) (err error) {
values := make(url.Values)
diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go
index f24edd2c4..611c1b9cf 100644
--- a/cmd/peer-rest-common.go
+++ b/cmd/peer-rest-common.go
@@ -18,7 +18,7 @@
package cmd
const (
- peerRESTVersion = "v29" // Added LocalStorageInfo peer API
+ peerRESTVersion = "v30" // Removed bloom filter
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go
index 4fe2e2fdb..3af2d399e 100644
--- a/cmd/peer-rest-server.go
+++ b/cmd/peer-rest-server.go
@@ -631,30 +631,6 @@ func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *htt
}
}
-// CycleServerBloomFilterHandler cycles bloom filter on server.
-func (s *peerRESTServer) CycleServerBloomFilterHandler(w http.ResponseWriter, r *http.Request) {
- if !s.IsValid(w, r) {
- s.writeErrorResponse(w, errors.New("Invalid request"))
- return
- }
-
- ctx := newContext(r, w, "CycleServerBloomFilter")
-
- var req bloomFilterRequest
- err := gob.NewDecoder(r.Body).Decode(&req)
- if err != nil {
- s.writeErrorResponse(w, err)
- return
- }
- bf, err := intDataUpdateTracker.cycleFilter(ctx, req)
- if err != nil {
- s.writeErrorResponse(w, err)
- return
- }
-
- logger.LogIf(ctx, gob.NewEncoder(w).Encode(bf))
-}
-
func (s *peerRESTServer) GetMetacacheListingHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
@@ -1390,7 +1366,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodOsInfo).HandlerFunc(httpTraceHdrs(server.GetOSInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDiskHwInfo).HandlerFunc(httpTraceHdrs(server.GetPartitionsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCPUInfo).HandlerFunc(httpTraceHdrs(server.GetCPUsHandler))
- subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetAllBucketStats).HandlerFunc(httpTraceHdrs(server.GetAllBucketStatsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go
index 9aef87a0a..62c451a4f 100644
--- a/cmd/xl-storage.go
+++ b/cmd/xl-storage.go
@@ -41,11 +41,9 @@ import (
"github.com/klauspost/filepathx"
"github.com/minio/madmin-go/v2"
"github.com/minio/minio/internal/bucket/lifecycle"
- "github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/disk"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
- "github.com/minio/pkg/console"
"github.com/zeebo/xxh3"
)
@@ -448,9 +446,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
lc, err = globalLifecycleSys.Get(cache.Info.Name)
if err == nil && lc.HasActiveRules("") {
cache.Info.lifeCycle = lc
- if intDataUpdateTracker.debug {
- console.Debugln(color.Green("scannerDisk:") + " lifecycle: Active rules found")
- }
}
}
@@ -463,10 +458,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
Config: rcfg,
remotes: tgts,
}
- if intDataUpdateTracker.debug {
- console.Debugln(color.Green("scannerDisk:") + " replication: Active rules found")
- }
-
}
}
}
@@ -502,9 +493,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
doneSz(len(buf))
res["metasize"] = fmt.Sprint(len(buf))
if err != nil {
- if intDataUpdateTracker.debug {
- console.Debugf(color.Green("scannerBucket:")+" object path missing: %v: %w\n", item.Path, err)
- }
+ res["err"] = err.Error()
return sizeSummary{}, errSkipFile
}
defer metaDataPoolPut(buf)
@@ -514,9 +503,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
fivs, err := getFileInfoVersions(buf, item.bucket, item.objectPath())
if err != nil {
- if intDataUpdateTracker.debug {
- console.Debugf(color.Green("scannerBucket:")+" reading xl.meta failed: %v: %w\n", item.Path, err)
- }
+ res["err"] = err.Error()
return sizeSummary{}, errSkipFile
}
@@ -531,9 +518,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
done()
if err != nil {
- if intDataUpdateTracker.debug {
- console.Debugf(color.Green("scannerBucket:")+" applying version actions failed: %v: %w\n", item.Path, err)
- }
+ res["err"] = err.Error()
return sizeSummary{}, errSkipFile
}
@@ -603,7 +588,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
}
}
}
-
return sizeS, nil
}, scanMode)
if err != nil {