mirror of https://github.com/minio/minio.git
feat: implement in-progress partial bucket updates (#12279)
This commit is contained in:
parent
866593fd94
commit
2ca9c533ef
|
@ -178,17 +178,23 @@ type cachedFolder struct {
|
|||
}
|
||||
|
||||
type folderScanner struct {
|
||||
root string
|
||||
getSize getSizeFn
|
||||
oldCache dataUsageCache
|
||||
newCache dataUsageCache
|
||||
withFilter *bloomFilter
|
||||
root string
|
||||
getSize getSizeFn
|
||||
oldCache dataUsageCache
|
||||
newCache dataUsageCache
|
||||
updateCache dataUsageCache
|
||||
withFilter *bloomFilter
|
||||
|
||||
dataUsageScannerDebug bool
|
||||
healFolderInclude uint32 // Include a clean folder one in n cycles.
|
||||
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
||||
|
||||
disks []StorageAPI
|
||||
|
||||
// If set updates will be sent regularly to this channel.
|
||||
// Will not be closed when returned.
|
||||
updates chan<- dataUsageEntry
|
||||
lastUpdate time.Time
|
||||
}
|
||||
|
||||
// Cache structure and compaction:
|
||||
|
@ -255,9 +261,11 @@ func scanDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
|||
getSize: getSize,
|
||||
oldCache: cache,
|
||||
newCache: dataUsageCache{Info: cache.Info},
|
||||
updateCache: dataUsageCache{Info: cache.Info},
|
||||
dataUsageScannerDebug: intDataUpdateTracker.debug,
|
||||
healFolderInclude: 0,
|
||||
healObjectSelect: 0,
|
||||
updates: cache.Info.updates,
|
||||
}
|
||||
|
||||
// Add disks for set healing.
|
||||
|
@ -318,6 +326,22 @@ func scanDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
|||
return s.newCache, nil
|
||||
}
|
||||
|
||||
// sendUpdate() should be called on a regular basis when the newCache contains more recent total than previously.
|
||||
// May or may not send an update upstream.
|
||||
func (f *folderScanner) sendUpdate() {
|
||||
// Send at most an update every minute.
|
||||
if f.updates == nil || time.Since(f.lastUpdate) < time.Minute {
|
||||
return
|
||||
}
|
||||
if flat := f.updateCache.sizeRecursive(f.newCache.Info.Name); flat != nil {
|
||||
select {
|
||||
case f.updates <- *flat:
|
||||
default:
|
||||
}
|
||||
f.lastUpdate = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// scanFolder will scan the provided folder.
|
||||
// Files found in the folders will be added to f.newCache.
|
||||
// If final is provided folders will be put into f.newFolders or f.existingFolders.
|
||||
|
@ -328,6 +352,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||
thisHash := hashPath(folder.name)
|
||||
// Store initial compaction state.
|
||||
wasCompacted := into.Compacted
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
|
@ -358,6 +383,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||
if folder.name != dataUsageRoot && !filter.containsDir(folder.name) {
|
||||
if f.healObjectSelect == 0 || !thisHash.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) {
|
||||
f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
|
||||
f.updateCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
|
||||
if f.dataUsageScannerDebug {
|
||||
console.Debugf(scannerLogPrefix+" Skipping non-updated folder: %v\n", folder.name)
|
||||
}
|
||||
|
@ -412,6 +438,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||
delete(abandonedChildren, h.Key()) // h.Key() already accounted for.
|
||||
if exists {
|
||||
existingFolders = append(existingFolders, this)
|
||||
f.updateCache.copyWithChildren(&f.oldCache, h, &thisHash)
|
||||
} else {
|
||||
newFolders = append(newFolders, this)
|
||||
}
|
||||
|
@ -497,11 +524,44 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||
if !into.Compacted {
|
||||
into.addChild(dataUsageHash(folder.name))
|
||||
}
|
||||
// We scanned a folder, optionally send update.
|
||||
f.sendUpdate()
|
||||
}
|
||||
|
||||
// Scan new...
|
||||
for _, folder := range newFolders {
|
||||
h := hashPath(folder.name)
|
||||
// Add new folders to the update tree so totals update for these.
|
||||
if !into.Compacted {
|
||||
var foundAny bool
|
||||
parent := thisHash
|
||||
for parent != hashPath(f.updateCache.Info.Name) {
|
||||
e := f.updateCache.find(parent.Key())
|
||||
if e == nil || e.Compacted {
|
||||
foundAny = true
|
||||
break
|
||||
}
|
||||
if next := f.updateCache.searchParent(parent); next == nil {
|
||||
foundAny = true
|
||||
break
|
||||
} else {
|
||||
parent = *next
|
||||
}
|
||||
}
|
||||
if !foundAny {
|
||||
// Add non-compacted empty entry.
|
||||
f.updateCache.replaceHashed(h, &thisHash, dataUsageEntry{})
|
||||
}
|
||||
}
|
||||
scanFolder(folder)
|
||||
// Add new folders if this is new and we don't have existing.
|
||||
if !into.Compacted {
|
||||
parent := f.updateCache.find(thisHash.Key())
|
||||
if parent != nil && !parent.Compacted {
|
||||
f.updateCache.deleteRecursive(h)
|
||||
f.updateCache.copyWithChildren(&f.newCache, h, &thisHash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Scan existing...
|
||||
|
@ -512,7 +572,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||
// and the entry itself is compacted.
|
||||
if !into.Compacted && f.oldCache.isCompacted(h) {
|
||||
if !h.mod(f.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
|
||||
if !h.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) {
|
||||
if f.healObjectSelect == 0 || !h.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) {
|
||||
// Transfer and add as child...
|
||||
f.newCache.copyWithChildren(&f.oldCache, h, folder.parent)
|
||||
into.addChild(h)
|
||||
|
@ -733,6 +793,13 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||
if !into.Compacted {
|
||||
f.newCache.reduceChildrenOf(thisHash, dataScannerCompactAtChildren, f.newCache.Info.Name != folder.name)
|
||||
}
|
||||
if _, ok := f.updateCache.Cache[thisHash.Key()]; !wasCompacted && ok {
|
||||
// Replace if existed before.
|
||||
if flat := f.newCache.sizeRecursive(thisHash.Key()); flat != nil {
|
||||
f.updateCache.deleteRecursive(thisHash)
|
||||
f.updateCache.replaceHashed(thisHash, folder.parent, *flat)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -153,8 +153,15 @@ type dataUsageCacheInfo struct {
|
|||
// indicates if the disk is being healed and scanner
|
||||
// should skip healing the disk
|
||||
SkipHealing bool
|
||||
BloomFilter []byte `msg:"BloomFilter,omitempty"`
|
||||
lifeCycle *lifecycle.Lifecycle `msg:"-"`
|
||||
BloomFilter []byte `msg:"BloomFilter,omitempty"`
|
||||
|
||||
// Active lifecycle, if any on the bucket
|
||||
lifeCycle *lifecycle.Lifecycle `msg:"-"`
|
||||
|
||||
// optional updates channel.
|
||||
// If set updates will be sent regularly to this channel.
|
||||
// Will not be closed when returned.
|
||||
updates chan<- dataUsageEntry `msg:"-"`
|
||||
}
|
||||
|
||||
func (e *dataUsageEntry) addSizes(summary sizeSummary) {
|
||||
|
@ -259,6 +266,31 @@ func (d *dataUsageCache) findChildrenCopy(h dataUsageHash) dataUsageHashMap {
|
|||
return res
|
||||
}
|
||||
|
||||
// searchParent will search for the parent of h.
|
||||
// This is an O(N*N) operation if there is no parent or it cannot be guessed.
|
||||
func (d *dataUsageCache) searchParent(h dataUsageHash) *dataUsageHash {
|
||||
want := h.Key()
|
||||
if idx := strings.LastIndexByte(want, '/'); idx >= 0 {
|
||||
if v := d.find(want[:idx]); v != nil {
|
||||
for child := range v.Children {
|
||||
if child == want {
|
||||
found := hashPath(want[:idx])
|
||||
return &found
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for k, v := range d.Cache {
|
||||
for child := range v.Children {
|
||||
if child == want {
|
||||
found := dataUsageHash(k)
|
||||
return &found
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns nil if not found.
|
||||
func (d *dataUsageCache) subCache(path string) dataUsageCache {
|
||||
dst := dataUsageCache{Info: dataUsageCacheInfo{
|
||||
|
@ -281,6 +313,15 @@ func (d *dataUsageCache) deleteRecursive(h dataUsageHash) {
|
|||
}
|
||||
}
|
||||
|
||||
// deleteChildren will delete any children, but not the entry itself.
|
||||
func (d *dataUsageCache) deleteChildren(h dataUsageHash) {
|
||||
if existing, ok := d.Cache[h.String()]; ok {
|
||||
for child := range existing.Children {
|
||||
d.deleteRecursive(dataUsageHash(child))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// replaceRootChild will replace the child of root in d with the root of 'other'.
|
||||
func (d *dataUsageCache) replaceRootChild(other dataUsageCache) {
|
||||
otherRoot := other.root()
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
"github.com/minio/minio/pkg/color"
|
||||
"github.com/minio/minio/pkg/console"
|
||||
"github.com/minio/minio/pkg/dsync"
|
||||
"github.com/minio/minio/pkg/sync/errgroup"
|
||||
)
|
||||
|
@ -475,11 +476,28 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
|
|||
NextCycle: 0,
|
||||
}
|
||||
}
|
||||
// Collect updates.
|
||||
updates := make(chan dataUsageEntry, 1)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for update := range updates {
|
||||
bucketResults <- dataUsageEntryInfo{
|
||||
Name: cache.Info.Name,
|
||||
Parent: dataUsageRoot,
|
||||
Entry: update,
|
||||
}
|
||||
if intDataUpdateTracker.debug {
|
||||
console.Debugln("bucket", bucket.Name, "got update", update)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Calc usage
|
||||
before := cache.Info.LastUpdate
|
||||
var err error
|
||||
cache, err = disk.NSScanner(ctx, cache)
|
||||
cache, err = disk.NSScanner(ctx, cache, updates)
|
||||
cache.Info.BloomFilter = nil
|
||||
if err != nil {
|
||||
if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) {
|
||||
|
@ -490,6 +508,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
|
|||
continue
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
var root dataUsageEntry
|
||||
if r := cache.root(); r != nil {
|
||||
root = cache.flatten(*r)
|
||||
|
|
18
cmd/fs-v1.go
18
cmd/fs-v1.go
|
@ -239,6 +239,7 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) {
|
|||
|
||||
// NSScanner returns data usage stats of the current FS deployment
|
||||
func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error {
|
||||
defer close(updates)
|
||||
// Load bucket totals
|
||||
var totalCache dataUsageCache
|
||||
err := totalCache.load(ctx, fs, dataUsageCacheName)
|
||||
|
@ -273,7 +274,21 @@ func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates cha
|
|||
bCache.Info.Name = b.Name
|
||||
}
|
||||
bCache.Info.BloomFilter = totalCache.Info.BloomFilter
|
||||
|
||||
upds := make(chan dataUsageEntry, 1)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for update := range upds {
|
||||
totalCache.replace(b.Name, dataUsageRoot, update)
|
||||
if intDataUpdateTracker.debug {
|
||||
logger.Info(color.Green("NSScanner:")+" Got update:", len(totalCache.Cache))
|
||||
}
|
||||
cloned := totalCache.clone()
|
||||
updates <- cloned.dui(dataUsageRoot, buckets)
|
||||
}
|
||||
}()
|
||||
bCache.Info.updates = upds
|
||||
cache, err := fs.scanBucket(ctx, b.Name, bCache)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -282,6 +297,7 @@ func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates cha
|
|||
}
|
||||
logger.LogIf(ctx, err)
|
||||
cache.Info.BloomFilter = nil
|
||||
wg.Wait()
|
||||
|
||||
if cache.root() == nil {
|
||||
if intDataUpdateTracker.debug {
|
||||
|
|
|
@ -110,8 +110,8 @@ func (d *naughtyDisk) SetDiskID(id string) {
|
|||
d.disk.SetDiskID(id)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache) (info dataUsageCache, err error) {
|
||||
return d.disk.NSScanner(ctx, cache)
|
||||
func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (info dataUsageCache, err error) {
|
||||
return d.disk.NSScanner(ctx, cache, updates)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
|
||||
|
|
|
@ -43,7 +43,7 @@ type StorageAPI interface {
|
|||
Healing() *healingTracker // Returns nil if disk is not healing.
|
||||
|
||||
DiskInfo(ctx context.Context) (info DiskInfo, err error)
|
||||
NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error)
|
||||
NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error)
|
||||
|
||||
// Volume operations.
|
||||
MakeVol(ctx context.Context, volume string) (err error)
|
||||
|
|
|
@ -206,7 +206,8 @@ func (client *storageRESTClient) Healing() *healingTracker {
|
|||
return val.(*healingTracker)
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
|
||||
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
|
||||
defer close(updates)
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
pw.CloseWithError(cache.serializeTo(pw))
|
||||
|
@ -218,14 +219,38 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
|||
return cache, err
|
||||
}
|
||||
|
||||
var newCache dataUsageCache
|
||||
pr, pw = io.Pipe()
|
||||
rr, rw := io.Pipe()
|
||||
go func() {
|
||||
pw.CloseWithError(waitForHTTPStream(respBody, pw))
|
||||
rw.CloseWithError(waitForHTTPStream(respBody, rw))
|
||||
}()
|
||||
err = newCache.deserialize(pr)
|
||||
pr.CloseWithError(err)
|
||||
return newCache, err
|
||||
|
||||
ms := msgp.NewReader(rr)
|
||||
for {
|
||||
// Read whether it is an update.
|
||||
upd, err := ms.ReadBool()
|
||||
if err != nil {
|
||||
rr.CloseWithError(err)
|
||||
return cache, err
|
||||
}
|
||||
if !upd {
|
||||
// No more updates... New cache follows.
|
||||
break
|
||||
}
|
||||
var update dataUsageEntry
|
||||
err = update.DecodeMsg(ms)
|
||||
if err != nil || err == io.EOF {
|
||||
rr.CloseWithError(err)
|
||||
return cache, err
|
||||
}
|
||||
updates <- update
|
||||
}
|
||||
var newCache dataUsageCache
|
||||
err = newCache.DecodeMsg(ms)
|
||||
rr.CloseWithError(err)
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
return cache, err
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) GetDiskID() (string, error) {
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v33" // Added transition related information to FileInfo
|
||||
storageRESTVersion = "v34" // Streaming Usage Updates
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
|
|
|
@ -19,6 +19,7 @@ package cmd
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/gob"
|
||||
"encoding/hex"
|
||||
|
@ -174,13 +175,43 @@ func (s *storageRESTServer) NSScannerHandler(w http.ResponseWriter, r *http.Requ
|
|||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(r.Context())
|
||||
defer cancel()
|
||||
resp := streamHTTPResponse(w)
|
||||
usageInfo, err := s.storage.NSScanner(r.Context(), cache)
|
||||
respW := msgp.NewWriter(resp)
|
||||
|
||||
// Collect updates, stream them before the full cache is sent.
|
||||
updates := make(chan dataUsageEntry, 1)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for update := range updates {
|
||||
// Write true bool to indicate update.
|
||||
if err = respW.WriteBool(true); err == nil {
|
||||
err = update.EncodeMsg(respW)
|
||||
}
|
||||
respW.Flush()
|
||||
if err != nil {
|
||||
cancel()
|
||||
resp.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
usageInfo, err := s.storage.NSScanner(ctx, cache, updates)
|
||||
if err != nil {
|
||||
respW.Flush()
|
||||
resp.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
resp.CloseWithError(usageInfo.serializeTo(resp))
|
||||
|
||||
// Write false bool to indicate we finished.
|
||||
wg.Wait()
|
||||
if err = respW.WriteBool(false); err == nil {
|
||||
err = usageInfo.EncodeMsg(respW)
|
||||
}
|
||||
resp.CloseWithError(respW.Flush())
|
||||
}
|
||||
|
||||
// MakeVolHandler - make a volume.
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
ewma "github.com/VividCortex/ewma"
|
||||
"github.com/VividCortex/ewma"
|
||||
"github.com/minio/madmin-go"
|
||||
)
|
||||
|
||||
|
@ -158,7 +158,7 @@ func (p *xlStorageDiskIDCheck) Healing() *healingTracker {
|
|||
return p.storage.Healing()
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
|
||||
func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return dataUsageCache{}, ctx.Err()
|
||||
|
@ -168,7 +168,7 @@ func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCac
|
|||
if err := p.checkDiskStale(); err != nil {
|
||||
return dataUsageCache{}, err
|
||||
}
|
||||
return p.storage.NSScanner(ctx, cache)
|
||||
return p.storage.NSScanner(ctx, cache, updates)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
|
||||
|
|
|
@ -384,7 +384,9 @@ func (s *xlStorage) Healing() *healingTracker {
|
|||
return &h
|
||||
}
|
||||
|
||||
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
|
||||
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
|
||||
// Updates must be closed before we return.
|
||||
defer close(updates)
|
||||
var lc *lifecycle.Lifecycle
|
||||
var err error
|
||||
|
||||
|
@ -409,6 +411,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUs
|
|||
globalHealConfigMu.Lock()
|
||||
healOpts := globalHealConfig
|
||||
globalHealConfigMu.Unlock()
|
||||
cache.Info.updates = updates
|
||||
|
||||
dataUsageInfo, err := scanDataFolder(ctx, s.diskPath, cache, func(item scannerItem) (sizeSummary, error) {
|
||||
// Look for `xl.meta/xl.json' at the leaf.
|
||||
|
|
Loading…
Reference in New Issue