Remove bloom filter (#16682)

Removes the bloom filter since it has so limited usability, often gets saturated anyway and adds a bunch of complexity to the scanner.

Also removes a tiny bit of CPU by each write operation.
This commit is contained in:
Klaus Post 2023-02-24 04:33:31 +01:00 committed by GitHub
parent b21d3f9b82
commit 9acf1024e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 20 additions and 1281 deletions

View File

@ -18,7 +18,6 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
@ -34,7 +33,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/minio/madmin-go/v2" "github.com/minio/madmin-go/v2"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
@ -162,7 +160,6 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
// Load current bloom cycle // Load current bloom cycle
var cycleInfo currentScannerCycle var cycleInfo currentScannerCycle
cycleInfo.next = intDataUpdateTracker.current() + 1
buf, _ := readConfig(ctx, objAPI, dataUsageBloomNamePath) buf, _ := readConfig(ctx, objAPI, dataUsageBloomNamePath)
if len(buf) == 8 { if len(buf) == 8 {
@ -207,9 +204,7 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
// Wait before starting next cycle and wait on startup. // Wait before starting next cycle and wait on startup.
results := make(chan DataUsageInfo, 1) results := make(chan DataUsageInfo, 1)
go storeDataUsageInBackend(ctx, objAPI, results) go storeDataUsageInBackend(ctx, objAPI, results)
bf, err := globalNotificationSys.updateBloomFilter(ctx, cycleInfo.current) err := objAPI.NSScanner(ctx, results, uint32(cycleInfo.current), scanMode)
logger.LogIf(ctx, err)
err = objAPI.NSScanner(ctx, bf, results, uint32(cycleInfo.current), scanMode)
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
res := map[string]string{"cycle": fmt.Sprint(cycleInfo.current)} res := map[string]string{"cycle": fmt.Sprint(cycleInfo.current)}
if err != nil { if err != nil {
@ -248,10 +243,8 @@ type folderScanner struct {
oldCache dataUsageCache oldCache dataUsageCache
newCache dataUsageCache newCache dataUsageCache
updateCache dataUsageCache updateCache dataUsageCache
withFilter *bloomFilter
dataUsageScannerDebug bool dataUsageScannerDebug bool
healFolderInclude uint32 // Include a clean folder one in n cycles.
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
scanMode madmin.HealScanMode scanMode madmin.HealScanMode
@ -310,8 +303,6 @@ type folderScanner struct {
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner. // Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
// If the supplied context is canceled the function will return at the first chance. // If the supplied context is canceled the function will return at the first chance.
func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) { func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) {
logPrefix := color.Green("data-usage: ")
switch cache.Info.Name { switch cache.Info.Name {
case "", dataUsageRoot: case "", dataUsageRoot:
return cache, errors.New("internal error: root scan attempted") return cache, errors.New("internal error: root scan attempted")
@ -325,8 +316,7 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
oldCache: cache, oldCache: cache,
newCache: dataUsageCache{Info: cache.Info}, newCache: dataUsageCache{Info: cache.Info},
updateCache: dataUsageCache{Info: cache.Info}, updateCache: dataUsageCache{Info: cache.Info},
dataUsageScannerDebug: intDataUpdateTracker.debug, dataUsageScannerDebug: false,
healFolderInclude: 0,
healObjectSelect: 0, healObjectSelect: 0,
scanMode: scanMode, scanMode: scanMode,
updates: cache.Info.updates, updates: cache.Info.updates,
@ -349,19 +339,9 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
// Enable healing in XL mode. // Enable healing in XL mode.
if globalIsErasure && !cache.Info.SkipHealing { if globalIsErasure && !cache.Info.SkipHealing {
// Include a clean folder one in n cycles.
s.healFolderInclude = healFolderIncludeProb
// Do a heal check on an object once every n cycles. Must divide into healFolderInclude // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
s.healObjectSelect = healObjectSelectProb s.healObjectSelect = healObjectSelectProb
} }
if len(cache.Info.BloomFilter) > 0 {
s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
_, err := s.withFilter.ReadFrom(bytes.NewReader(cache.Info.BloomFilter))
if err != nil {
logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter")
s.withFilter = nil
}
}
done := ctx.Done() done := ctx.Done()
@ -418,14 +398,12 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
return ctx.Err() return ctx.Err()
default: default:
} }
existing, ok := f.oldCache.Cache[thisHash.Key()]
var abandonedChildren dataUsageHashMap var abandonedChildren dataUsageHashMap
if !into.Compacted { if !into.Compacted {
abandonedChildren = f.oldCache.findChildrenCopy(thisHash) abandonedChildren = f.oldCache.findChildrenCopy(thisHash)
} }
// If there are lifecycle rules for the prefix, remove the filter. // If there are lifecycle rules for the prefix, remove the filter.
filter := f.withFilter
_, prefix := path2BucketObjectWithBasePath(f.root, folder.name) _, prefix := path2BucketObjectWithBasePath(f.root, folder.name)
var activeLifeCycle *lifecycle.Lifecycle var activeLifeCycle *lifecycle.Lifecycle
if f.oldCache.Info.lifeCycle != nil && f.oldCache.Info.lifeCycle.HasActiveRules(prefix) { if f.oldCache.Info.lifeCycle != nil && f.oldCache.Info.lifeCycle.HasActiveRules(prefix) {
@ -433,33 +411,13 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
console.Debugf(scannerLogPrefix+" Prefix %q has active rules\n", prefix) console.Debugf(scannerLogPrefix+" Prefix %q has active rules\n", prefix)
} }
activeLifeCycle = f.oldCache.Info.lifeCycle activeLifeCycle = f.oldCache.Info.lifeCycle
filter = nil
} }
// If there are replication rules for the prefix, remove the filter. // If there are replication rules for the prefix, remove the filter.
var replicationCfg replicationConfig var replicationCfg replicationConfig
if !f.oldCache.Info.replication.Empty() && f.oldCache.Info.replication.Config.HasActiveRules(prefix, true) { if !f.oldCache.Info.replication.Empty() && f.oldCache.Info.replication.Config.HasActiveRules(prefix, true) {
replicationCfg = f.oldCache.Info.replication replicationCfg = f.oldCache.Info.replication
filter = nil
} }
// Check if we can skip it due to bloom filter... // Check if we can skip it due to bloom filter...
if filter != nil && ok && existing.Compacted {
// If folder isn't in filter and we have data, skip it completely.
if folder.name != dataUsageRoot && !filter.containsDir(folder.name) {
if f.healObjectSelect == 0 || !thisHash.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healFolderInclude/folder.objectHealProbDiv) {
f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
f.updateCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
if f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" Skipping non-updated folder: %v\n", folder.name)
}
return nil
}
if f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" Adding non-updated folder to heal check: %v\n", folder.name)
}
// If probability was already scannerHealFolderInclude, keep it.
folder.objectHealProbDiv = f.healFolderInclude
}
}
scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder) scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder)
var existingFolders, newFolders []cachedFolder var existingFolders, newFolders []cachedFolder
@ -673,13 +631,10 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
// and the entry itself is compacted. // and the entry itself is compacted.
if !into.Compacted && f.oldCache.isCompacted(h) { if !into.Compacted && f.oldCache.isCompacted(h) {
if !h.mod(f.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) { if !h.mod(f.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
if f.healObjectSelect == 0 || !h.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healFolderInclude/folder.objectHealProbDiv) { // Transfer and add as child...
// Transfer and add as child... f.newCache.copyWithChildren(&f.oldCache, h, folder.parent)
f.newCache.copyWithChildren(&f.oldCache, h, folder.parent) into.addChild(h)
into.addChild(h) continue
continue
}
folder.objectHealProbDiv = f.healFolderInclude
} }
} }
f.updateCurrentPath(folder.name) f.updateCurrentPath(folder.name)

View File

@ -1,682 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"io"
"os"
"path"
"sort"
"strings"
"sync"
"time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console"
)
const (
// Estimate bloom filter size. With this many items
dataUpdateTrackerEstItems = 200000
// ... we want this false positive rate:
dataUpdateTrackerFP = 0.1
dataUpdateTrackerQueueSize = 0
dataUpdateTrackerFilename = dataUsageBucket + SlashSeparator + ".tracker.bin"
dataUpdateTrackerVersion = 7
dataUpdateTrackerSaveInterval = 5 * time.Minute
)
var intDataUpdateTracker *dataUpdateTracker
func init() {
intDataUpdateTracker = newDataUpdateTracker()
}
type dataUpdateTracker struct {
mu sync.Mutex
input chan string
save chan struct{}
debug bool
saveExited chan struct{}
dirty bool
Current dataUpdateFilter
History dataUpdateTrackerHistory
Saved time.Time
}
// newDataUpdateTracker returns a dataUpdateTracker with default settings.
func newDataUpdateTracker() *dataUpdateTracker {
d := &dataUpdateTracker{
Current: dataUpdateFilter{
idx: 1,
},
debug: serverDebugLog,
input: make(chan string, dataUpdateTrackerQueueSize),
save: make(chan struct{}, 1),
saveExited: make(chan struct{}),
}
d.Current.bf = d.newBloomFilter()
d.dirty = true
return d
}
type dataUpdateTrackerHistory []dataUpdateFilter
type dataUpdateFilter struct {
idx uint64
bf bloomFilter
}
type bloomFilter struct {
*bloom.BloomFilter
}
// emptyBloomFilter returns an empty bloom filter.
func emptyBloomFilter() bloomFilter {
return bloomFilter{BloomFilter: &bloom.BloomFilter{}}
}
// containsDir returns whether the bloom filter contains a directory.
// Note that objects in XL mode are also considered directories.
func (b bloomFilter) containsDir(in string) bool {
split := splitPathDeterministic(path.Clean(in))
if len(split) == 0 {
return false
}
return b.TestString(hashPath(path.Join(split...)).String())
}
// bytes returns the bloom filter serialized as a byte slice.
func (b *bloomFilter) bytes() []byte {
if b == nil || b.BloomFilter == nil {
return nil
}
var buf bytes.Buffer
_, err := b.WriteTo(&buf)
if err != nil {
logger.LogIf(GlobalContext, err)
return nil
}
return buf.Bytes()
}
// sort the dataUpdateTrackerHistory, newest first.
// Returns whether the history is complete.
func (d dataUpdateTrackerHistory) sort() bool {
if len(d) == 0 {
return true
}
sort.Slice(d, func(i, j int) bool {
return d[i].idx > d[j].idx
})
return d[0].idx-d[len(d)-1].idx == uint64(len(d))
}
// removeOlderThan will remove entries older than index 'n'.
func (d *dataUpdateTrackerHistory) removeOlderThan(n uint64) {
d.sort()
dd := *d
end := len(dd)
for i := end - 1; i >= 0; i-- {
if dd[i].idx < n {
end = i
}
}
dd = dd[:end]
*d = dd
}
// newBloomFilter returns a new bloom filter with default settings.
func (d *dataUpdateTracker) newBloomFilter() bloomFilter {
return bloomFilter{bloom.NewWithEstimates(dataUpdateTrackerEstItems, dataUpdateTrackerFP)}
}
// current returns the current index.
func (d *dataUpdateTracker) current() uint64 {
d.mu.Lock()
defer d.mu.Unlock()
return d.Current.idx
}
// latestWithDir returns the highest index that contains the directory.
// This means that any cycle higher than this does NOT contain the entry.
func (d *dataUpdateTracker) latestWithDir(dir string) uint64 {
dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
bucket, _ := path2BucketObjectWithBasePath("", dir)
if bucket == "" {
if d.debug && len(dir) > 0 {
console.Debugf(dateUpdateTrackerLogPrefix+" no bucket (%s)\n", dir)
}
return d.current()
}
if isReservedOrInvalidBucket(bucket, false) {
return d.current()
}
d.mu.Lock()
defer d.mu.Unlock()
if d.Current.bf.containsDir(dir) || d.Current.idx == 0 {
return d.Current.idx
}
if d.debug {
console.Debugf(dateUpdateTrackerLogPrefix+" current bloom does NOT contains dir %s\n", dir)
}
idx := d.Current.idx - 1
for {
f := d.History.find(idx)
if f == nil || f.bf.containsDir(dir) || idx == 0 {
break
}
idx--
}
return idx
}
// start will load the current data from the drives start collecting information and
// start a saver goroutine.
// All of these will exit when the context is canceled.
func (d *dataUpdateTracker) start(ctx context.Context, drives ...string) {
if len(drives) == 0 {
logger.LogIf(ctx, errors.New("dataUpdateTracker.start: No local drives specified"))
return
}
d.load(ctx, drives...)
go d.startCollector(ctx)
// startSaver will unlock.
d.mu.Lock()
go d.startSaver(ctx, dataUpdateTrackerSaveInterval, drives...)
}
// load will attempt to load data tracking information from the supplied drives.
// The data will only be loaded if d.Saved is older than the one found on disk.
// The newest working cache will be kept in d.
// If no valid data usage tracker can be found d will remain unchanged.
// If object is shared the caller should lock it.
func (d *dataUpdateTracker) load(ctx context.Context, drives ...string) {
if len(drives) == 0 {
logger.LogIf(ctx, errors.New("dataUpdateTracker.load: No local drives specified"))
return
}
for _, drive := range drives {
cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename)
f, err := OpenFile(cacheFormatPath, readMode, 0o666)
if err != nil {
if osIsNotExist(err) {
continue
}
logger.LogIf(ctx, err)
continue
}
err = d.deserialize(f, d.Saved)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
logger.LogIf(ctx, err)
}
f.Close()
}
}
// startSaver will start a saver that will write d to all supplied drives at specific intervals.
// 'd' must be write locked when started and will be unlocked.
// The saver will save and exit when supplied context is closed.
func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives ...string) {
if len(drives) == 0 {
return
}
saveNow := d.save
exited := make(chan struct{})
d.saveExited = exited
d.mu.Unlock()
t := time.NewTicker(interval)
defer t.Stop()
defer close(exited)
var buf bytes.Buffer
for {
var exit bool
select {
case <-ctx.Done():
exit = true
case <-t.C:
case <-saveNow:
}
buf.Reset()
d.mu.Lock()
if !d.dirty {
d.mu.Unlock()
if exit {
return
}
continue
}
d.Saved = UTCNow()
err := d.serialize(&buf)
if d.debug {
console.Debugf(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v\n", buf.Len(), d.Current.idx)
}
d.dirty = false
d.mu.Unlock()
if err != nil {
logger.LogIf(ctx, err, "Error serializing usage tracker data")
if exit {
return
}
continue
}
if buf.Len() == 0 {
logger.LogIf(ctx, errors.New("zero sized output, skipping save"))
continue
}
for _, drive := range drives {
cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename)
err := os.WriteFile(cacheFormatPath, buf.Bytes(), os.ModePerm)
if err != nil {
if osIsNotExist(err) {
continue
}
logger.LogIf(ctx, err)
continue
}
}
if exit {
return
}
}
}
// serialize all data in d to dst.
// Caller should hold lock if d is expected to be shared.
// If an error is returned, there will likely be partial data written to dst.
func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) {
ctx := GlobalContext
var tmp [8]byte
o := bufio.NewWriter(dst)
defer func() {
if err == nil {
err = o.Flush()
}
}()
// Version
if err := o.WriteByte(dataUpdateTrackerVersion); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
// Timestamp.
binary.LittleEndian.PutUint64(tmp[:], uint64(d.Saved.Unix()))
if _, err := o.Write(tmp[:]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
// Current
binary.LittleEndian.PutUint64(tmp[:], d.Current.idx)
if _, err := o.Write(tmp[:]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
if _, err := d.Current.bf.WriteTo(o); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
// History
binary.LittleEndian.PutUint64(tmp[:], uint64(len(d.History)))
if _, err := o.Write(tmp[:]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
for _, bf := range d.History {
// Current
binary.LittleEndian.PutUint64(tmp[:], bf.idx)
if _, err := o.Write(tmp[:]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
if _, err := bf.bf.WriteTo(o); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
}
return nil
}
// deserialize will deserialize the supplied input if the input is newer than the supplied time.
func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) error {
ctx := GlobalContext
var dst dataUpdateTracker
var tmp [8]byte
// Version
if _, err := io.ReadFull(src, tmp[:1]); err != nil {
if d.debug {
if err != io.EOF {
logger.LogIf(ctx, err)
}
}
return err
}
switch tmp[0] {
case 1, 2, 3, 4, 5, 6:
if intDataUpdateTracker.debug {
console.Debugln(color.Green("dataUpdateTracker: ") + "deprecated data version, updating.")
}
return nil
case dataUpdateTrackerVersion:
default:
return errors.New("dataUpdateTracker: Unknown data version")
}
// Timestamp.
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
t := time.Unix(int64(binary.LittleEndian.Uint64(tmp[:])), 0)
if !t.After(newerThan) {
return nil
}
// Current
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
dst.Current.idx = binary.LittleEndian.Uint64(tmp[:])
dst.Current.bf = emptyBloomFilter()
if _, err := dst.Current.bf.ReadFrom(src); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
// History
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
n := binary.LittleEndian.Uint64(tmp[:])
dst.History = make(dataUpdateTrackerHistory, int(n))
for i, e := range dst.History {
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
e.idx = binary.LittleEndian.Uint64(tmp[:])
e.bf = emptyBloomFilter()
if _, err := e.bf.ReadFrom(src); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
dst.History[i] = e
}
// Ignore what remains on the stream.
// Update d:
d.mu.Lock()
defer d.mu.Unlock()
d.Current = dst.Current
d.History = dst.History
d.Saved = dst.Saved
return nil
}
// start a collector that picks up entries from objectUpdatedCh
// and adds them to the current bloom filter.
func (d *dataUpdateTracker) startCollector(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case in := <-d.input:
bucket, _ := path2BucketObjectWithBasePath("", in)
if bucket == "" {
if d.debug && len(in) > 0 {
console.Debugf(color.Green("dataUpdateTracker:")+" no bucket (%s)\n", in)
}
continue
}
if isReservedOrInvalidBucket(bucket, false) {
continue
}
split := splitPathDeterministic(in)
// Add all paths until done.
d.mu.Lock()
for i := range split {
d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
}
d.dirty = d.dirty || len(split) > 0
d.mu.Unlock()
}
}
}
// markDirty adds the supplied path to the current bloom filter.
func (d *dataUpdateTracker) markDirty(bucket, prefix string) {
dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
if bucket == "" && d.debug {
console.Debugf(dateUpdateTrackerLogPrefix + " no bucket specified\n")
return
}
if isReservedOrInvalidBucket(bucket, false) && d.debug {
return
}
split := splitPathDeterministic(pathJoin(bucket, prefix))
// Add all paths until done.
d.mu.Lock()
for i := range split {
d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
}
d.dirty = d.dirty || len(split) > 0
d.mu.Unlock()
}
// find entry with specified index.
// Returns nil if not found.
func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter {
for _, f := range d {
if f.idx == idx {
return &f
}
}
return nil
}
// filterFrom will return a combined bloom filter.
func (d *dataUpdateTracker) filterFrom(ctx context.Context, oldest, newest uint64) *bloomFilterResponse {
bf := d.newBloomFilter()
bfr := bloomFilterResponse{
OldestIdx: oldest,
CurrentIdx: d.Current.idx,
Complete: true,
}
// Loop through each index requested.
for idx := oldest; idx <= newest; idx++ {
v := d.History.find(idx)
if v == nil {
if d.Current.idx == idx {
// Merge current.
err := bf.Merge(d.Current.bf.BloomFilter)
logger.LogIf(ctx, err)
if err != nil {
bfr.Complete = false
}
continue
}
bfr.Complete = false
bfr.OldestIdx = idx + 1
continue
}
err := bf.Merge(v.bf.BloomFilter)
if err != nil {
bfr.Complete = false
logger.LogIf(ctx, err)
continue
}
bfr.NewestIdx = idx
}
var dst bytes.Buffer
_, err := bf.WriteTo(&dst)
if err != nil {
logger.LogIf(ctx, err)
return nil
}
bfr.Filter = dst.Bytes()
return &bfr
}
// cycleFilter will cycle the bloom filter to start recording to index y if not already.
// The response will contain a bloom filter starting at index x up to, but not including index y.
// If y is 0, the response will not update y, but return the currently recorded information
// from the oldest (unless 0, then it will be all) until and including current y.
func (d *dataUpdateTracker) cycleFilter(ctx context.Context, req bloomFilterRequest) (*bloomFilterResponse, error) {
if req.OldestClean != "" {
return &bloomFilterResponse{OldestIdx: d.latestWithDir(req.OldestClean)}, nil
}
current := req.Current
oldest := req.Oldest
d.mu.Lock()
defer d.mu.Unlock()
if current == 0 {
if len(d.History) == 0 {
return d.filterFrom(ctx, d.Current.idx, d.Current.idx), nil
}
d.History.sort()
if oldest == 0 {
oldest = d.History[len(d.History)-1].idx
}
return d.filterFrom(ctx, oldest, d.Current.idx), nil
}
// Move current to history if new one requested
if d.Current.idx != current {
d.dirty = true
if d.debug {
console.Debugf(color.Green("dataUpdateTracker:")+" cycle bloom filter: %v -> %v\n", d.Current.idx, current)
}
d.History = append(d.History, d.Current)
d.Current.idx = current
d.Current.bf = d.newBloomFilter()
select {
case d.save <- struct{}{}:
default:
}
}
d.History.removeOlderThan(oldest)
return d.filterFrom(ctx, oldest, current), nil
}
// splitPathDeterministic will split the provided relative path
// deterministically and return up to the first 3 elements of the path.
// slash and dot prefixes are removed.
// Trailing slashes are removed.
// Returns 0 length if no parts are found after trimming.
func splitPathDeterministic(in string) []string {
split := strings.Split(decodeDirObject(in), SlashSeparator)
// Trim empty start/end
for len(split) > 0 {
if len(split[0]) > 0 && split[0] != "." {
break
}
split = split[1:]
}
for len(split) > 0 {
if len(split[len(split)-1]) > 0 {
break
}
split = split[:len(split)-1]
}
return split
}
// bloomFilterRequest request bloom filters.
// Current index will be updated to current and entries back to Oldest is returned.
type bloomFilterRequest struct {
Oldest uint64
Current uint64
// If set the oldest clean version will be returned in OldestIdx
// and the rest of the request will be ignored.
OldestClean string
}
type bloomFilterResponse struct {
// Current index being written to.
CurrentIdx uint64
// Oldest index in the returned bloom filter.
OldestIdx uint64
// Newest Index in the returned bloom filter.
NewestIdx uint64
// Are all indexes between oldest and newest filled?
Complete bool
// Binary data of the bloom filter.
Filter []byte
}
// NSUpdated indicates namespace has been updated.
// The function will block until the entry has been picked up.
func NSUpdated(bucket, prefix string) {
if intDataUpdateTracker != nil {
intDataUpdateTracker.markDirty(bucket, prefix)
}
}

View File

@ -1,299 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"fmt"
"math/rand"
"os"
"path"
"path/filepath"
"sync"
"testing"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/logger/target/types"
"github.com/minio/pkg/logger/message/log"
)
type testLoggerI interface {
Helper()
Log(args ...interface{})
}
type testingLogger struct {
mu sync.Mutex
t testLoggerI
}
func (t *testingLogger) Endpoint() string {
return ""
}
func (t *testingLogger) String() string {
return ""
}
func (t *testingLogger) Init() error {
return nil
}
func (t *testingLogger) Cancel() {
}
func (t *testingLogger) Type() types.TargetType {
return types.TargetHTTP
}
func (t *testingLogger) IsOnline() bool {
return true
}
// Stats returns the target statistics.
func (t *testingLogger) Stats() types.TargetStats {
return types.TargetStats{}
}
func (t *testingLogger) Send(entry interface{}) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.t == nil {
return nil
}
e, ok := entry.(log.Entry)
if !ok {
return fmt.Errorf("unexpected log entry structure %#v", entry)
}
t.t.Helper()
t.t.Log(e.Level, ":", e.Message)
return nil
}
func addTestingLogging(t testLoggerI) func() {
tl := &testingLogger{t: t}
logger.AddSystemTarget(tl)
return func() {
tl.mu.Lock()
defer tl.mu.Unlock()
tl.t = nil
}
}
func TestDataUpdateTracker(t *testing.T) {
dut := newDataUpdateTracker()
// Change some defaults.
dut.debug = testing.Verbose()
dut.input = make(chan string)
dut.save = make(chan struct{})
defer addTestingLogging(t)()
dut.Current.bf = dut.newBloomFilter()
tmpDir := t.TempDir()
err := os.MkdirAll(filepath.Dir(filepath.Join(tmpDir, dataUpdateTrackerFilename)), os.ModePerm)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dut.start(ctx, tmpDir)
tests := []struct {
in string
check []string // if not empty, check against these instead.
exist bool
}{
{
in: "bucket/directory/file.txt",
check: []string{"bucket", "bucket/", "/bucket", "bucket/directory", "bucket/directory/", "bucket/directory/file.txt", "/bucket/directory/file.txt"},
exist: true,
},
{
// System bucket
in: ".minio.sys/ignoreme/pls",
exist: false,
},
{
// Not a valid bucket
in: "./bucket/okfile.txt",
check: []string{"./bucket/okfile.txt", "/bucket/okfile.txt", "bucket/okfile.txt"},
exist: false,
},
{
// Not a valid bucket
in: "æ/okfile.txt",
check: []string{"æ/okfile.txt", "æ/okfile.txt", "æ"},
exist: false,
},
{
in: "/bucket2/okfile2.txt",
check: []string{"./bucket2/okfile2.txt", "/bucket2/okfile2.txt", "bucket2/okfile2.txt", "bucket2"},
exist: true,
},
{
in: "/bucket3/prefix/okfile2.txt",
check: []string{"./bucket3/prefix/okfile2.txt", "/bucket3/prefix/okfile2.txt", "bucket3/prefix/okfile2.txt", "bucket3/prefix", "bucket3"},
exist: true,
},
}
for _, tt := range tests {
t.Run(tt.in, func(t *testing.T) {
dut.input <- tt.in
dut.input <- "" // Sending empty string ensures the previous is added to filter.
dut.mu.Lock()
defer dut.mu.Unlock()
if len(tt.check) == 0 {
got := dut.Current.bf.containsDir(tt.in)
if got != tt.exist {
// For unlimited tests this could lead to false positives,
// but it should be deterministic.
t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist)
}
return
}
for _, check := range tt.check {
got := dut.Current.bf.containsDir(check)
if got != tt.exist {
// For unlimited tests this could lead to false positives,
// but it should be deterministic.
t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist)
}
continue
}
})
}
// Cycle to history
req := bloomFilterRequest{
Oldest: 1,
Current: 2,
}
_, err = dut.cycleFilter(ctx, req)
if err != nil {
t.Fatal(err)
}
dut.input <- "cycle2/file.txt"
dut.input <- "" // Sending empty string ensures the previous is added to filter.
tests = append(tests, struct {
in string
check []string
exist bool
}{in: "cycle2/file.txt", exist: true})
// Shut down
cancel()
<-dut.saveExited
if dut.current() != 2 {
t.Fatal("wrong current idx after save. want 2, got:", dut.current())
}
ctx, cancel = context.WithCancel(context.Background())
// Reload...
dut = newDataUpdateTracker()
dut.start(ctx, tmpDir)
defer func() {
cancel()
<-dut.saveExited
}()
if dut.current() != 2 {
t.Fatal("current idx after load not preserved. want 2, got:", dut.current())
}
req = bloomFilterRequest{
Oldest: 1,
Current: 3,
}
bfr2, err := dut.cycleFilter(ctx, req)
if err != nil {
t.Fatal(err)
}
if !bfr2.Complete {
t.Fatal("Wanted complete, didn't get it")
}
if bfr2.CurrentIdx != 3 {
t.Fatal("wanted index 3, got", bfr2.CurrentIdx)
}
if bfr2.OldestIdx != 1 {
t.Fatal("wanted oldest index 3, got", bfr2.OldestIdx)
}
t.Logf("Size of filter %d bytes, M: %d, K:%d", len(bfr2.Filter), dut.Current.bf.Cap(), dut.Current.bf.K())
// Rerun test with returned bfr2
bf := dut.newBloomFilter()
_, err = bf.ReadFrom(bytes.NewReader(bfr2.Filter))
if err != nil {
t.Fatal(err)
}
for _, tt := range tests {
t.Run(tt.in+"-reloaded", func(t *testing.T) {
if len(tt.check) == 0 {
got := bf.containsDir(tt.in)
if got != tt.exist {
// For unlimited tests this could lead to false positives,
// but it should be deterministic.
t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist)
}
return
}
for _, check := range tt.check {
got := bf.containsDir(check)
if got != tt.exist {
// For unlimited tests this could lead to false positives,
// but it should be deterministic.
t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist)
}
continue
}
})
}
}
func BenchmarkDataUpdateTracker(b *testing.B) {
dut := newDataUpdateTracker()
// Change some defaults.
dut.debug = false
dut.input = make(chan string)
dut.save = make(chan struct{})
defer addTestingLogging(b)()
dut.Current.bf = dut.newBloomFilter()
// We do this unbuffered. This will very significantly reduce throughput, so this is a worst case.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go dut.startCollector(ctx)
input := make([]string, 1000)
rng := rand.New(rand.NewSource(0xabad1dea))
tmp := []string{"bucket", "aprefix", "nextprefixlevel", "maybeobjname", "evendeeper", "ok-one-morelevel", "final.object"}
for i := range input {
tmp := tmp[:1+rng.Intn(cap(tmp)-1)]
input[i] = path.Join(tmp...)
}
b.SetBytes(1)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
dut.input <- input[rng.Intn(len(input))]
}
}

View File

@ -276,7 +276,6 @@ type dataUsageCacheInfo struct {
// indicates if the disk is being healed and scanner // indicates if the disk is being healed and scanner
// should skip healing the disk // should skip healing the disk
SkipHealing bool SkipHealing bool
BloomFilter []byte `msg:"BloomFilter,omitempty"`
// Active lifecycle, if any on the bucket // Active lifecycle, if any on the bucket
lifeCycle *lifecycle.Lifecycle `msg:"-"` lifeCycle *lifecycle.Lifecycle `msg:"-"`
@ -649,10 +648,7 @@ func (d *dataUsageCache) reduceChildrenOf(path dataUsageHash, limit int, compact
// StringAll returns a detailed string representation of all entries in the cache. // StringAll returns a detailed string representation of all entries in the cache.
func (d *dataUsageCache) StringAll() string { func (d *dataUsageCache) StringAll() string {
// Remove bloom filter from print. // Remove bloom filter from print.
bf := d.Info.BloomFilter
d.Info.BloomFilter = nil
s := fmt.Sprintf("info:%+v\n", d.Info) s := fmt.Sprintf("info:%+v\n", d.Info)
d.Info.BloomFilter = bf
for k, v := range d.Cache { for k, v := range d.Cache {
s += fmt.Sprintf("\t%v: %+v\n", k, v) s += fmt.Sprintf("\t%v: %+v\n", k, v)
} }

View File

@ -629,12 +629,6 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "SkipHealing") err = msgp.WrapError(err, "SkipHealing")
return return
} }
case "BloomFilter":
z.BloomFilter, err = dc.ReadBytes(z.BloomFilter)
if err != nil {
err = msgp.WrapError(err, "BloomFilter")
return
}
default: default:
err = dc.Skip() err = dc.Skip()
if err != nil { if err != nil {
@ -648,24 +642,9 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) {
// omitempty: check for empty values // map header, size 4
zb0001Len := uint32(5)
var zb0001Mask uint8 /* 5 bits */
_ = zb0001Mask
if z.BloomFilter == nil {
zb0001Len--
zb0001Mask |= 0x10
}
// variable map header, size zb0001Len
err = en.Append(0x80 | uint8(zb0001Len))
if err != nil {
return
}
if zb0001Len == 0 {
return
}
// write "Name" // write "Name"
err = en.Append(0xa4, 0x4e, 0x61, 0x6d, 0x65) err = en.Append(0x84, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
if err != nil { if err != nil {
return return
} }
@ -704,39 +683,15 @@ func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "SkipHealing") err = msgp.WrapError(err, "SkipHealing")
return return
} }
if (zb0001Mask & 0x10) == 0 { // if not empty
// write "BloomFilter"
err = en.Append(0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72)
if err != nil {
return
}
err = en.WriteBytes(z.BloomFilter)
if err != nil {
err = msgp.WrapError(err, "BloomFilter")
return
}
}
return return
} }
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) { func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// omitempty: check for empty values // map header, size 4
zb0001Len := uint32(5)
var zb0001Mask uint8 /* 5 bits */
_ = zb0001Mask
if z.BloomFilter == nil {
zb0001Len--
zb0001Mask |= 0x10
}
// variable map header, size zb0001Len
o = append(o, 0x80|uint8(zb0001Len))
if zb0001Len == 0 {
return
}
// string "Name" // string "Name"
o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65) o = append(o, 0x84, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = msgp.AppendString(o, z.Name) o = msgp.AppendString(o, z.Name)
// string "NextCycle" // string "NextCycle"
o = append(o, 0xa9, 0x4e, 0x65, 0x78, 0x74, 0x43, 0x79, 0x63, 0x6c, 0x65) o = append(o, 0xa9, 0x4e, 0x65, 0x78, 0x74, 0x43, 0x79, 0x63, 0x6c, 0x65)
@ -747,11 +702,6 @@ func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) {
// string "SkipHealing" // string "SkipHealing"
o = append(o, 0xab, 0x53, 0x6b, 0x69, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67) o = append(o, 0xab, 0x53, 0x6b, 0x69, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67)
o = msgp.AppendBool(o, z.SkipHealing) o = msgp.AppendBool(o, z.SkipHealing)
if (zb0001Mask & 0x10) == 0 { // if not empty
// string "BloomFilter"
o = append(o, 0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72)
o = msgp.AppendBytes(o, z.BloomFilter)
}
return return
} }
@ -797,12 +747,6 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "SkipHealing") err = msgp.WrapError(err, "SkipHealing")
return return
} }
case "BloomFilter":
z.BloomFilter, bts, err = msgp.ReadBytesBytes(bts, z.BloomFilter)
if err != nil {
err = msgp.WrapError(err, "BloomFilter")
return
}
default: default:
bts, err = msgp.Skip(bts) bts, err = msgp.Skip(bts)
if err != nil { if err != nil {
@ -817,7 +761,7 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *dataUsageCacheInfo) Msgsize() (s int) { func (z *dataUsageCacheInfo) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 10 + msgp.Uint32Size + 11 + msgp.TimeSize + 12 + msgp.BoolSize + 12 + msgp.BytesPrefixSize + len(z.BloomFilter) s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 10 + msgp.Uint32Size + 11 + msgp.TimeSize + 12 + msgp.BoolSize
return return
} }

View File

@ -24,7 +24,7 @@ import (
"io" "io"
"testing" "testing"
humanize "github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
) )
type badDisk struct{ StorageAPI } type badDisk struct{ StorageAPI }
@ -41,10 +41,6 @@ func (a badDisk) ReadFileStream(ctx context.Context, volume, path string, offset
return nil, errFaultyDisk return nil, errFaultyDisk
} }
func (a badDisk) UpdateBloomFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) {
return nil, errFaultyDisk
}
func (a badDisk) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error { func (a badDisk) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error {
return errFaultyDisk return errFaultyDisk
} }

View File

@ -72,10 +72,6 @@ func (fi FileInfo) DataShardFixed() bool {
func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) ( func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (
result madmin.HealResultItem, err error, result madmin.HealResultItem, err error,
) { ) {
if !opts.DryRun {
defer NSUpdated(bucket, slashSeparator)
}
storageDisks := er.getDisks() storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints() storageEndpoints := er.getEndpoints()
@ -754,7 +750,6 @@ func (er *erasureObjects) healObjectDir(ctx context.Context, bucket, object stri
}(index, disk) }(index, disk)
} }
wg.Wait() wg.Wait()
NSUpdated(bucket, object)
} }
} }

View File

@ -1216,7 +1216,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
if err != nil { if err != nil {
return oi, toObjectErr(err, bucket, object) return oi, toObjectErr(err, bucket, object)
} }
defer NSUpdated(bucket, object)
if !opts.Speedtest && versionsDisparity { if !opts.Speedtest && versionsDisparity {
listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity) listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity)

View File

@ -72,8 +72,6 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
return oi, NotImplemented{} return oi, NotImplemented{}
} }
defer NSUpdated(dstBucket, dstObject)
if !dstOpts.NoLock { if !dstOpts.NoLock {
lk := er.NewNSLock(dstBucket, dstObject) lk := er.NewNSLock(dstBucket, dstObject)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout) lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
@ -483,7 +481,6 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
if opts.VersionID != "" { if opts.VersionID != "" {
err = errFileVersionNotFound err = errFileVersionNotFound
} }
defer NSUpdated(bucket, object)
fi := FileInfo{ fi := FileInfo{
VersionID: m.VersionID, VersionID: m.VersionID,
@ -1271,8 +1268,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
defer NSUpdated(bucket, object)
for i := 0; i < len(onlineDisks); i++ { for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
// Object info is the same in all disks, so we can pick // Object info is the same in all disks, so we can pick
@ -1475,8 +1470,6 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
} else { } else {
errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName) errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName)
} }
defer NSUpdated(bucket, objects[objIndex].ObjectName)
} }
// Check failed deletes across multiple objects // Check failed deletes across multiple objects
@ -1568,9 +1561,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
} }
} }
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
defer NSUpdated(bucket, object)
storageDisks := er.getDisks() storageDisks := er.getDisks()
versionFound := true versionFound := true
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response. objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
@ -1940,7 +1930,6 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
if fi.TransitionStatus == lifecycle.TransitionComplete { if fi.TransitionStatus == lifecycle.TransitionComplete {
return nil return nil
} }
defer NSUpdated(bucket, object)
if fi.XLV1 { if fi.XLV1 {
if _, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{NoLock: true}); err != nil { if _, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{NoLock: true}); err != nil {

View File

@ -56,9 +56,6 @@ type erasureServerPools struct {
serverPools []*erasureSets serverPools []*erasureSets
// Shut down async operations
shutdown context.CancelFunc
// Active decommission canceler // Active decommission canceler
decommissionCancelers []context.CancelFunc decommissionCancelers []context.CancelFunc
@ -160,14 +157,7 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
break break
} }
drives := make([]string, 0, len(localDrives))
for _, localDrive := range localDrives {
drives = append(drives, localDrive.Endpoint().Path)
}
globalLocalDrives = localDrives globalLocalDrives = localDrives
ctx, z.shutdown = context.WithCancel(ctx)
go intDataUpdateTracker.start(ctx, drives...)
return z, nil return z, nil
} }
@ -524,8 +514,6 @@ func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object stri
} }
func (z *erasureServerPools) Shutdown(ctx context.Context) error { func (z *erasureServerPools) Shutdown(ctx context.Context) error {
defer z.shutdown()
g := errgroup.WithNErrs(len(z.serverPools)) g := errgroup.WithNErrs(len(z.serverPools))
for index := range z.serverPools { for index := range z.serverPools {
@ -594,7 +582,7 @@ func (z *erasureServerPools) StorageInfo(ctx context.Context) StorageInfo {
return globalNotificationSys.StorageInfo(z) return globalNotificationSys.StorageInfo(z)
} }
func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error { func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error {
// Updates must be closed before we return. // Updates must be closed before we return.
defer close(updates) defer close(updates)
@ -639,7 +627,7 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd
} }
}() }()
// Start scanner. Blocks until done. // Start scanner. Blocks until done.
err := erObj.nsScanner(ctx, allBuckets, bf, wantCycle, updates, healScanMode) err := erObj.nsScanner(ctx, allBuckets, wantCycle, updates, healScanMode)
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
mu.Lock() mu.Lock()
@ -712,8 +700,6 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd
// even if one of the sets fail to create buckets, we proceed all the successful // even if one of the sets fail to create buckets, we proceed all the successful
// operations. // operations.
func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
defer NSUpdated(bucket, slashSeparator)
// Verify if bucket is valid. // Verify if bucket is valid.
if !isMinioMetaBucketName(bucket) { if !isMinioMetaBucketName(bucket) {
if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil { if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil {
@ -1637,7 +1623,6 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, op
return BucketNameInvalid{Bucket: bucket} return BucketNameInvalid{Bucket: bucket}
} }
defer NSUpdated(bucket, slashSeparator)
if !opts.NoLock { if !opts.NoLock {
// Lock the bucket name before creating. // Lock the bucket name before creating.
lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck") lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck")

View File

@ -33,7 +33,6 @@ import (
"github.com/minio/minio/internal/dsync" "github.com/minio/minio/internal/dsync"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup" "github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/console"
) )
// list all errors that can be ignore in a bucket operation. // list all errors that can be ignore in a bucket operation.
@ -351,7 +350,7 @@ func (er erasureObjects) cleanupDeletedObjects(ctx context.Context) {
// nsScanner will start scanning buckets and send updated totals as they are traversed. // nsScanner will start scanning buckets and send updated totals as they are traversed.
// Updates are sent on a regular basis and the caller *must* consume them. // Updates are sent on a regular basis and the caller *must* consume them.
func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, wantCycle uint32, updates chan<- dataUsageCache, healScanMode madmin.HealScanMode) error { func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wantCycle uint32, updates chan<- dataUsageCache, healScanMode madmin.HealScanMode) error {
if len(buckets) == 0 { if len(buckets) == 0 {
return nil return nil
} }
@ -377,7 +376,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
}, },
Cache: make(map[string]dataUsageEntry, len(oldCache.Cache)), Cache: make(map[string]dataUsageEntry, len(oldCache.Cache)),
} }
bloom := bf.bytes()
// Put all buckets into channel. // Put all buckets into channel.
bucketCh := make(chan BucketInfo, len(buckets)) bucketCh := make(chan BucketInfo, len(buckets))
@ -473,7 +471,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
if cache.Info.Name == "" { if cache.Info.Name == "" {
cache.Info.Name = bucket.Name cache.Info.Name = bucket.Name
} }
cache.Info.BloomFilter = bloom
cache.Info.SkipHealing = healing cache.Info.SkipHealing = healing
cache.Info.NextCycle = wantCycle cache.Info.NextCycle = wantCycle
if cache.Info.Name != bucket.Name { if cache.Info.Name != bucket.Name {
@ -496,16 +493,12 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
Parent: dataUsageRoot, Parent: dataUsageRoot,
Entry: update, Entry: update,
} }
if intDataUpdateTracker.debug {
console.Debugln("z:", er.poolIndex, "s:", er.setIndex, "bucket", name, "got update", update)
}
} }
}(cache.Info.Name) }(cache.Info.Name)
// Calc usage // Calc usage
before := cache.Info.LastUpdate before := cache.Info.LastUpdate
var err error var err error
cache, err = disk.NSScanner(ctx, cache, updates, healScanMode) cache, err = disk.NSScanner(ctx, cache, updates, healScanMode)
cache.Info.BloomFilter = nil
if err != nil { if err != nil {
if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) { if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) {
logger.LogIf(ctx, cache.save(ctx, er, cacheName)) logger.LogIf(ctx, cache.save(ctx, er, cacheName))

View File

@ -18,9 +18,7 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -29,7 +27,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/cespare/xxhash/v2" "github.com/cespare/xxhash/v2"
"github.com/klauspost/compress/zip" "github.com/klauspost/compress/zip"
"github.com/minio/madmin-go/v2" "github.com/minio/madmin-go/v2"
@ -384,75 +381,6 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE
return ng.Wait() return ng.Wait()
} }
// updateBloomFilter will cycle all servers to the current index and
// return a merged bloom filter if a complete one can be retrieved.
func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint64) (*bloomFilter, error) {
req := bloomFilterRequest{
Current: current,
Oldest: current - dataUsageUpdateDirCycles,
}
if current < dataUsageUpdateDirCycles {
req.Oldest = 0
}
// Load initial state from local...
var bf *bloomFilter
bfr, err := intDataUpdateTracker.cycleFilter(ctx, req)
logger.LogIf(ctx, err)
if err == nil && bfr.Complete {
nbf := intDataUpdateTracker.newBloomFilter()
bf = &nbf
_, err = bf.ReadFrom(bytes.NewReader(bfr.Filter))
logger.LogIf(ctx, err)
}
var mu sync.Mutex
g := errgroup.WithNErrs(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
g.Go(func() error {
serverBF, err := client.cycleServerBloomFilter(ctx, req)
if false && intDataUpdateTracker.debug {
b, _ := json.MarshalIndent(serverBF, "", " ")
logger.Info("Drive %v, Bloom filter: %v", client.host.Name, string(b))
}
// Keep lock while checking result.
mu.Lock()
defer mu.Unlock()
if err != nil || !serverBF.Complete || bf == nil {
logger.LogOnceIf(ctx, err, client.host.String(), client.cycleServerBloomFilter)
bf = nil
return nil
}
var tmp bloom.BloomFilter
_, err = tmp.ReadFrom(bytes.NewReader(serverBF.Filter))
if err != nil {
logger.LogIf(ctx, err)
bf = nil
return nil
}
if bf.BloomFilter == nil {
bf.BloomFilter = &tmp
} else {
err = bf.Merge(&tmp)
if err != nil {
logger.LogIf(ctx, err)
bf = nil
return nil
}
}
return nil
}, idx)
}
g.Wait()
return bf, nil
}
var errPeerNotReachable = errors.New("peer is not reachable") var errPeerNotReachable = errors.New("peer is not reachable")
// GetLocks - makes GetLocks RPC call on all peers. // GetLocks - makes GetLocks RPC call on all peers.

View File

@ -195,7 +195,7 @@ type ObjectLayer interface {
// Storage operations. // Storage operations.
Shutdown(context.Context) error Shutdown(context.Context) error
NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, scanMode madmin.HealScanMode) error NSScanner(ctx context.Context, updates chan<- DataUsageInfo, wantCycle uint32, scanMode madmin.HealScanMode) error
BackendInfo() madmin.BackendInfo BackendInfo() madmin.BackendInfo
StorageInfo(ctx context.Context) StorageInfo StorageInfo(ctx context.Context) StorageInfo
LocalStorageInfo(ctx context.Context) StorageInfo LocalStorageInfo(ctx context.Context) StorageInfo

View File

@ -309,25 +309,6 @@ func (client *peerRESTClient) DeleteBucketMetadata(bucket string) error {
return nil return nil
} }
// cycleServerBloomFilter will cycle the bloom filter to start recording to index y if not already.
// The response will contain a bloom filter starting at index x up to, but not including index y.
// If y is 0, the response will not update y, but return the currently recorded information
// from the current x to y-1.
func (client *peerRESTClient) cycleServerBloomFilter(ctx context.Context, req bloomFilterRequest) (*bloomFilterResponse, error) {
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(req)
if err != nil {
return nil, err
}
respBody, err := client.callWithContext(ctx, peerRESTMethodCycleBloom, nil, &reader, -1)
if err != nil {
return nil, err
}
var resp bloomFilterResponse
defer xhttp.DrainBody(respBody)
return &resp, gob.NewDecoder(respBody).Decode(&resp)
}
// DeletePolicy - delete a specific canned policy. // DeletePolicy - delete a specific canned policy.
func (client *peerRESTClient) DeletePolicy(policyName string) (err error) { func (client *peerRESTClient) DeletePolicy(policyName string) (err error) {
values := make(url.Values) values := make(url.Values)

View File

@ -18,7 +18,7 @@
package cmd package cmd
const ( const (
peerRESTVersion = "v29" // Added LocalStorageInfo peer API peerRESTVersion = "v30" // Removed bloom filter
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer" peerRESTPrefix = minioReservedBucketPath + "/peer"

View File

@ -631,30 +631,6 @@ func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *htt
} }
} }
// CycleServerBloomFilterHandler cycles bloom filter on server.
func (s *peerRESTServer) CycleServerBloomFilterHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "CycleServerBloomFilter")
var req bloomFilterRequest
err := gob.NewDecoder(r.Body).Decode(&req)
if err != nil {
s.writeErrorResponse(w, err)
return
}
bf, err := intDataUpdateTracker.cycleFilter(ctx, req)
if err != nil {
s.writeErrorResponse(w, err)
return
}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(bf))
}
func (s *peerRESTServer) GetMetacacheListingHandler(w http.ResponseWriter, r *http.Request) { func (s *peerRESTServer) GetMetacacheListingHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) { if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request")) s.writeErrorResponse(w, errors.New("Invalid request"))
@ -1390,7 +1366,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodOsInfo).HandlerFunc(httpTraceHdrs(server.GetOSInfoHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodOsInfo).HandlerFunc(httpTraceHdrs(server.GetOSInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDiskHwInfo).HandlerFunc(httpTraceHdrs(server.GetPartitionsHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDiskHwInfo).HandlerFunc(httpTraceHdrs(server.GetPartitionsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCPUInfo).HandlerFunc(httpTraceHdrs(server.GetCPUsHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCPUInfo).HandlerFunc(httpTraceHdrs(server.GetCPUsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetAllBucketStats).HandlerFunc(httpTraceHdrs(server.GetAllBucketStatsHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetAllBucketStats).HandlerFunc(httpTraceHdrs(server.GetAllBucketStatsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)

View File

@ -41,11 +41,9 @@ import (
"github.com/klauspost/filepathx" "github.com/klauspost/filepathx"
"github.com/minio/madmin-go/v2" "github.com/minio/madmin-go/v2"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/disk" "github.com/minio/minio/internal/disk"
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console"
"github.com/zeebo/xxh3" "github.com/zeebo/xxh3"
) )
@ -448,9 +446,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
lc, err = globalLifecycleSys.Get(cache.Info.Name) lc, err = globalLifecycleSys.Get(cache.Info.Name)
if err == nil && lc.HasActiveRules("") { if err == nil && lc.HasActiveRules("") {
cache.Info.lifeCycle = lc cache.Info.lifeCycle = lc
if intDataUpdateTracker.debug {
console.Debugln(color.Green("scannerDisk:") + " lifecycle: Active rules found")
}
} }
} }
@ -463,10 +458,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
Config: rcfg, Config: rcfg,
remotes: tgts, remotes: tgts,
} }
if intDataUpdateTracker.debug {
console.Debugln(color.Green("scannerDisk:") + " replication: Active rules found")
}
} }
} }
} }
@ -502,9 +493,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
doneSz(len(buf)) doneSz(len(buf))
res["metasize"] = fmt.Sprint(len(buf)) res["metasize"] = fmt.Sprint(len(buf))
if err != nil { if err != nil {
if intDataUpdateTracker.debug { res["err"] = err.Error()
console.Debugf(color.Green("scannerBucket:")+" object path missing: %v: %w\n", item.Path, err)
}
return sizeSummary{}, errSkipFile return sizeSummary{}, errSkipFile
} }
defer metaDataPoolPut(buf) defer metaDataPoolPut(buf)
@ -514,9 +503,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
fivs, err := getFileInfoVersions(buf, item.bucket, item.objectPath()) fivs, err := getFileInfoVersions(buf, item.bucket, item.objectPath())
if err != nil { if err != nil {
if intDataUpdateTracker.debug { res["err"] = err.Error()
console.Debugf(color.Green("scannerBucket:")+" reading xl.meta failed: %v: %w\n", item.Path, err)
}
return sizeSummary{}, errSkipFile return sizeSummary{}, errSkipFile
} }
@ -531,9 +518,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
done() done()
if err != nil { if err != nil {
if intDataUpdateTracker.debug { res["err"] = err.Error()
console.Debugf(color.Green("scannerBucket:")+" applying version actions failed: %v: %w\n", item.Path, err)
}
return sizeSummary{}, errSkipFile return sizeSummary{}, errSkipFile
} }
@ -603,7 +588,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
} }
} }
} }
return sizeS, nil return sizeS, nil
}, scanMode) }, scanMode)
if err != nil { if err != nil {