2021-04-18 15:41:13 -04:00
|
|
|
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
|
|
//
|
|
|
|
// This file is part of MinIO Object Storage stack
|
|
|
|
//
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
//
|
|
|
|
// This program is distributed in the hope that it will be useful
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU Affero General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
2020-04-27 13:06:21 -04:00
|
|
|
|
|
|
|
package cmd
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bufio"
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"encoding/binary"
|
|
|
|
"errors"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
|
|
|
"os"
|
|
|
|
"path"
|
|
|
|
"sort"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2021-05-27 11:02:39 -04:00
|
|
|
"github.com/bits-and-blooms/bloom/v3"
|
2021-06-01 17:59:40 -04:00
|
|
|
"github.com/minio/minio/internal/color"
|
|
|
|
"github.com/minio/minio/internal/logger"
|
2021-05-28 18:17:01 -04:00
|
|
|
"github.com/minio/pkg/console"
|
2020-04-27 13:06:21 -04:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
// Estimate bloom filter size. With this many items
|
2021-05-17 11:31:04 -04:00
|
|
|
dataUpdateTrackerEstItems = 200000
|
2020-04-27 13:06:21 -04:00
|
|
|
// ... we want this false positive rate:
|
2021-05-17 11:31:04 -04:00
|
|
|
dataUpdateTrackerFP = 0.1
|
2020-10-30 12:33:16 -04:00
|
|
|
dataUpdateTrackerQueueSize = 0
|
2020-04-27 13:06:21 -04:00
|
|
|
|
2020-04-28 04:16:57 -04:00
|
|
|
dataUpdateTrackerFilename = dataUsageBucket + SlashSeparator + ".tracker.bin"
|
2021-08-25 11:25:26 -04:00
|
|
|
dataUpdateTrackerVersion = 7
|
2020-04-27 13:06:21 -04:00
|
|
|
dataUpdateTrackerSaveInterval = 5 * time.Minute
|
|
|
|
)
|
|
|
|
|
2021-05-17 11:31:04 -04:00
|
|
|
var intDataUpdateTracker *dataUpdateTracker
|
2020-04-27 13:06:21 -04:00
|
|
|
|
|
|
|
func init() {
|
|
|
|
intDataUpdateTracker = newDataUpdateTracker()
|
|
|
|
}
|
|
|
|
|
|
|
|
type dataUpdateTracker struct {
|
|
|
|
mu sync.Mutex
|
|
|
|
input chan string
|
|
|
|
save chan struct{}
|
|
|
|
debug bool
|
|
|
|
saveExited chan struct{}
|
2020-05-15 00:46:36 -04:00
|
|
|
dirty bool
|
2020-04-27 13:06:21 -04:00
|
|
|
|
|
|
|
Current dataUpdateFilter
|
|
|
|
History dataUpdateTrackerHistory
|
|
|
|
Saved time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
// newDataUpdateTracker returns a dataUpdateTracker with default settings.
|
|
|
|
func newDataUpdateTracker() *dataUpdateTracker {
|
|
|
|
d := &dataUpdateTracker{
|
|
|
|
Current: dataUpdateFilter{
|
|
|
|
idx: 1,
|
|
|
|
},
|
2021-04-03 12:03:42 -04:00
|
|
|
debug: serverDebugLog,
|
2020-04-27 13:06:21 -04:00
|
|
|
input: make(chan string, dataUpdateTrackerQueueSize),
|
|
|
|
save: make(chan struct{}, 1),
|
|
|
|
saveExited: make(chan struct{}),
|
|
|
|
}
|
|
|
|
d.Current.bf = d.newBloomFilter()
|
2020-05-15 00:46:36 -04:00
|
|
|
d.dirty = true
|
2020-04-27 13:06:21 -04:00
|
|
|
return d
|
|
|
|
}
|
|
|
|
|
|
|
|
type dataUpdateTrackerHistory []dataUpdateFilter
|
|
|
|
|
|
|
|
type dataUpdateFilter struct {
|
|
|
|
idx uint64
|
|
|
|
bf bloomFilter
|
|
|
|
}
|
|
|
|
|
|
|
|
type bloomFilter struct {
|
|
|
|
*bloom.BloomFilter
|
|
|
|
}
|
|
|
|
|
|
|
|
// emptyBloomFilter returns an empty bloom filter.
|
|
|
|
func emptyBloomFilter() bloomFilter {
|
|
|
|
return bloomFilter{BloomFilter: &bloom.BloomFilter{}}
|
|
|
|
}
|
|
|
|
|
|
|
|
// containsDir returns whether the bloom filter contains a directory.
|
|
|
|
// Note that objects in XL mode are also considered directories.
|
|
|
|
func (b bloomFilter) containsDir(in string) bool {
|
|
|
|
split := splitPathDeterministic(path.Clean(in))
|
|
|
|
|
|
|
|
if len(split) == 0 {
|
|
|
|
return false
|
|
|
|
}
|
2020-06-12 13:28:21 -04:00
|
|
|
return b.TestString(hashPath(path.Join(split...)).String())
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
// bytes returns the bloom filter serialized as a byte slice.
|
2020-06-17 11:54:41 -04:00
|
|
|
func (b *bloomFilter) bytes() []byte {
|
|
|
|
if b == nil || b.BloomFilter == nil {
|
2020-04-27 13:06:21 -04:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
|
|
_, err := b.WriteTo(&buf)
|
|
|
|
if err != nil {
|
|
|
|
logger.LogIf(GlobalContext, err)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return buf.Bytes()
|
|
|
|
}
|
|
|
|
|
|
|
|
// sort the dataUpdateTrackerHistory, newest first.
|
|
|
|
// Returns whether the history is complete.
|
|
|
|
func (d dataUpdateTrackerHistory) sort() bool {
|
|
|
|
if len(d) == 0 {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
sort.Slice(d, func(i, j int) bool {
|
|
|
|
return d[i].idx > d[j].idx
|
|
|
|
})
|
|
|
|
return d[0].idx-d[len(d)-1].idx == uint64(len(d))
|
|
|
|
}
|
|
|
|
|
|
|
|
// removeOlderThan will remove entries older than index 'n'.
|
|
|
|
func (d *dataUpdateTrackerHistory) removeOlderThan(n uint64) {
|
|
|
|
d.sort()
|
|
|
|
dd := *d
|
|
|
|
end := len(dd)
|
|
|
|
for i := end - 1; i >= 0; i-- {
|
|
|
|
if dd[i].idx < n {
|
|
|
|
end = i
|
|
|
|
}
|
|
|
|
}
|
|
|
|
dd = dd[:end]
|
|
|
|
*d = dd
|
|
|
|
}
|
|
|
|
|
|
|
|
// newBloomFilter returns a new bloom filter with default settings.
|
|
|
|
func (d *dataUpdateTracker) newBloomFilter() bloomFilter {
|
|
|
|
return bloomFilter{bloom.NewWithEstimates(dataUpdateTrackerEstItems, dataUpdateTrackerFP)}
|
|
|
|
}
|
|
|
|
|
|
|
|
// current returns the current index.
|
|
|
|
func (d *dataUpdateTracker) current() uint64 {
|
|
|
|
d.mu.Lock()
|
|
|
|
defer d.mu.Unlock()
|
|
|
|
return d.Current.idx
|
|
|
|
}
|
|
|
|
|
2020-10-28 12:18:35 -04:00
|
|
|
// latestWithDir returns the highest index that contains the directory.
|
|
|
|
// This means that any cycle higher than this does NOT contain the entry.
|
|
|
|
func (d *dataUpdateTracker) latestWithDir(dir string) uint64 {
|
2020-12-29 04:57:28 -05:00
|
|
|
dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
|
2020-10-28 12:18:35 -04:00
|
|
|
bucket, _ := path2BucketObjectWithBasePath("", dir)
|
|
|
|
if bucket == "" {
|
|
|
|
if d.debug && len(dir) > 0 {
|
2020-12-29 04:57:28 -05:00
|
|
|
console.Debugf(dateUpdateTrackerLogPrefix+" no bucket (%s)\n", dir)
|
2020-10-28 12:18:35 -04:00
|
|
|
}
|
|
|
|
return d.current()
|
|
|
|
}
|
|
|
|
if isReservedOrInvalidBucket(bucket, false) {
|
|
|
|
return d.current()
|
|
|
|
}
|
|
|
|
|
|
|
|
d.mu.Lock()
|
|
|
|
defer d.mu.Unlock()
|
|
|
|
if d.Current.bf.containsDir(dir) || d.Current.idx == 0 {
|
|
|
|
return d.Current.idx
|
|
|
|
}
|
|
|
|
if d.debug {
|
2020-12-29 04:57:28 -05:00
|
|
|
console.Debugf(dateUpdateTrackerLogPrefix+" current bloom does NOT contains dir %s\n", dir)
|
2020-10-28 12:18:35 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
idx := d.Current.idx - 1
|
|
|
|
for {
|
|
|
|
f := d.History.find(idx)
|
|
|
|
if f == nil || f.bf.containsDir(dir) || idx == 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
idx--
|
|
|
|
}
|
|
|
|
return idx
|
|
|
|
}
|
|
|
|
|
2020-04-27 13:06:21 -04:00
|
|
|
// start will load the current data from the drives start collecting information and
|
|
|
|
// start a saver goroutine.
|
|
|
|
// All of these will exit when the context is canceled.
|
|
|
|
func (d *dataUpdateTracker) start(ctx context.Context, drives ...string) {
|
2021-11-16 12:28:29 -05:00
|
|
|
if len(drives) == 0 {
|
2022-01-24 14:28:45 -05:00
|
|
|
logger.LogIf(ctx, errors.New("dataUpdateTracker.start: No local drives specified"))
|
2020-04-27 13:06:21 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
d.load(ctx, drives...)
|
|
|
|
go d.startCollector(ctx)
|
2020-08-20 16:17:42 -04:00
|
|
|
// startSaver will unlock.
|
|
|
|
d.mu.Lock()
|
2022-01-24 14:28:45 -05:00
|
|
|
go d.startSaver(ctx, dataUpdateTrackerSaveInterval, drives...)
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
// load will attempt to load data tracking information from the supplied drives.
|
|
|
|
// The data will only be loaded if d.Saved is older than the one found on disk.
|
|
|
|
// The newest working cache will be kept in d.
|
|
|
|
// If no valid data usage tracker can be found d will remain unchanged.
|
|
|
|
// If object is shared the caller should lock it.
|
|
|
|
func (d *dataUpdateTracker) load(ctx context.Context, drives ...string) {
|
2021-11-16 12:28:29 -05:00
|
|
|
if len(drives) == 0 {
|
2022-01-24 14:28:45 -05:00
|
|
|
logger.LogIf(ctx, errors.New("dataUpdateTracker.load: No local drives specified"))
|
2020-04-27 13:06:21 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
for _, drive := range drives {
|
2020-04-28 04:16:57 -04:00
|
|
|
|
2020-04-27 13:06:21 -04:00
|
|
|
cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename)
|
2022-08-01 16:22:43 -04:00
|
|
|
f, err := OpenFile(cacheFormatPath, readMode, 0o666)
|
2020-04-27 13:06:21 -04:00
|
|
|
if err != nil {
|
2020-11-23 11:36:49 -05:00
|
|
|
if osIsNotExist(err) {
|
2020-04-27 13:06:21 -04:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
err = d.deserialize(f, d.Saved)
|
2020-07-20 15:28:48 -04:00
|
|
|
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
2020-04-27 13:06:21 -04:00
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
f.Close()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// startSaver will start a saver that will write d to all supplied drives at specific intervals.
|
2020-08-20 16:17:42 -04:00
|
|
|
// 'd' must be write locked when started and will be unlocked.
|
2020-04-27 13:06:21 -04:00
|
|
|
// The saver will save and exit when supplied context is closed.
|
2022-01-24 14:28:45 -05:00
|
|
|
func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives ...string) {
|
|
|
|
if len(drives) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-04-27 13:06:21 -04:00
|
|
|
saveNow := d.save
|
|
|
|
exited := make(chan struct{})
|
|
|
|
d.saveExited = exited
|
|
|
|
d.mu.Unlock()
|
2020-08-20 16:17:42 -04:00
|
|
|
t := time.NewTicker(interval)
|
|
|
|
defer t.Stop()
|
2020-04-27 13:06:21 -04:00
|
|
|
defer close(exited)
|
2020-08-20 16:17:42 -04:00
|
|
|
var buf bytes.Buffer
|
2020-04-27 13:06:21 -04:00
|
|
|
for {
|
|
|
|
var exit bool
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
exit = true
|
|
|
|
case <-t.C:
|
|
|
|
case <-saveNow:
|
|
|
|
}
|
|
|
|
buf.Reset()
|
|
|
|
d.mu.Lock()
|
2020-05-15 00:46:36 -04:00
|
|
|
if !d.dirty {
|
|
|
|
d.mu.Unlock()
|
2020-09-29 16:23:53 -04:00
|
|
|
if exit {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
continue
|
2020-05-15 00:46:36 -04:00
|
|
|
}
|
2020-04-27 13:06:21 -04:00
|
|
|
d.Saved = UTCNow()
|
|
|
|
err := d.serialize(&buf)
|
|
|
|
if d.debug {
|
2020-12-29 04:57:28 -05:00
|
|
|
console.Debugf(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v\n", buf.Len(), d.Current.idx)
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
2020-05-15 00:46:36 -04:00
|
|
|
d.dirty = false
|
2020-04-27 13:06:21 -04:00
|
|
|
d.mu.Unlock()
|
|
|
|
if err != nil {
|
|
|
|
logger.LogIf(ctx, err, "Error serializing usage tracker data")
|
|
|
|
if exit {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if buf.Len() == 0 {
|
|
|
|
logger.LogIf(ctx, errors.New("zero sized output, skipping save"))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
for _, drive := range drives {
|
|
|
|
cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename)
|
|
|
|
err := ioutil.WriteFile(cacheFormatPath, buf.Bytes(), os.ModePerm)
|
|
|
|
if err != nil {
|
2020-11-23 11:36:49 -05:00
|
|
|
if osIsNotExist(err) {
|
2020-04-28 04:16:57 -04:00
|
|
|
continue
|
|
|
|
}
|
2020-04-27 13:06:21 -04:00
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if exit {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// serialize all data in d to dst.
|
|
|
|
// Caller should hold lock if d is expected to be shared.
|
|
|
|
// If an error is returned, there will likely be partial data written to dst.
|
|
|
|
func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) {
|
|
|
|
ctx := GlobalContext
|
|
|
|
var tmp [8]byte
|
|
|
|
o := bufio.NewWriter(dst)
|
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = o.Flush()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Version
|
|
|
|
if err := o.WriteByte(dataUpdateTrackerVersion); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// Timestamp.
|
|
|
|
binary.LittleEndian.PutUint64(tmp[:], uint64(d.Saved.Unix()))
|
|
|
|
if _, err := o.Write(tmp[:]); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Current
|
|
|
|
binary.LittleEndian.PutUint64(tmp[:], d.Current.idx)
|
|
|
|
if _, err := o.Write(tmp[:]); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err := d.Current.bf.WriteTo(o); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// History
|
|
|
|
binary.LittleEndian.PutUint64(tmp[:], uint64(len(d.History)))
|
|
|
|
if _, err := o.Write(tmp[:]); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, bf := range d.History {
|
|
|
|
// Current
|
|
|
|
binary.LittleEndian.PutUint64(tmp[:], bf.idx)
|
|
|
|
if _, err := o.Write(tmp[:]); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err := bf.bf.WriteTo(o); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// deserialize will deserialize the supplied input if the input is newer than the supplied time.
|
|
|
|
func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) error {
|
|
|
|
ctx := GlobalContext
|
|
|
|
var dst dataUpdateTracker
|
|
|
|
var tmp [8]byte
|
|
|
|
|
|
|
|
// Version
|
|
|
|
if _, err := io.ReadFull(src, tmp[:1]); err != nil {
|
|
|
|
if d.debug {
|
2020-05-04 23:04:06 -04:00
|
|
|
if err != io.EOF {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
switch tmp[0] {
|
2021-08-25 11:25:26 -04:00
|
|
|
case 1, 2, 3, 4, 5, 6:
|
2021-05-17 11:31:04 -04:00
|
|
|
if intDataUpdateTracker.debug {
|
|
|
|
console.Debugln(color.Green("dataUpdateTracker: ") + "deprecated data version, updating.")
|
|
|
|
}
|
2020-06-12 13:28:21 -04:00
|
|
|
return nil
|
2020-04-27 13:06:21 -04:00
|
|
|
case dataUpdateTrackerVersion:
|
|
|
|
default:
|
|
|
|
return errors.New("dataUpdateTracker: Unknown data version")
|
|
|
|
}
|
|
|
|
// Timestamp.
|
|
|
|
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
t := time.Unix(int64(binary.LittleEndian.Uint64(tmp[:])), 0)
|
|
|
|
if !t.After(newerThan) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Current
|
|
|
|
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
dst.Current.idx = binary.LittleEndian.Uint64(tmp[:])
|
|
|
|
dst.Current.bf = emptyBloomFilter()
|
|
|
|
if _, err := dst.Current.bf.ReadFrom(src); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// History
|
|
|
|
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
n := binary.LittleEndian.Uint64(tmp[:])
|
|
|
|
dst.History = make(dataUpdateTrackerHistory, int(n))
|
|
|
|
for i, e := range dst.History {
|
|
|
|
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
e.idx = binary.LittleEndian.Uint64(tmp[:])
|
|
|
|
e.bf = emptyBloomFilter()
|
|
|
|
if _, err := e.bf.ReadFrom(src); err != nil {
|
|
|
|
if d.debug {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
dst.History[i] = e
|
|
|
|
}
|
|
|
|
// Ignore what remains on the stream.
|
|
|
|
// Update d:
|
2020-09-15 23:44:48 -04:00
|
|
|
d.mu.Lock()
|
|
|
|
defer d.mu.Unlock()
|
2020-04-27 13:06:21 -04:00
|
|
|
d.Current = dst.Current
|
|
|
|
d.History = dst.History
|
|
|
|
d.Saved = dst.Saved
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// start a collector that picks up entries from objectUpdatedCh
|
|
|
|
// and adds them to the current bloom filter.
|
|
|
|
func (d *dataUpdateTracker) startCollector(ctx context.Context) {
|
2021-12-02 14:29:16 -05:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case in := <-d.input:
|
|
|
|
bucket, _ := path2BucketObjectWithBasePath("", in)
|
|
|
|
if bucket == "" {
|
|
|
|
if d.debug && len(in) > 0 {
|
|
|
|
console.Debugf(color.Green("dataUpdateTracker:")+" no bucket (%s)\n", in)
|
|
|
|
}
|
|
|
|
continue
|
2020-10-28 12:18:35 -04:00
|
|
|
}
|
|
|
|
|
2021-12-02 14:29:16 -05:00
|
|
|
if isReservedOrInvalidBucket(bucket, false) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
split := splitPathDeterministic(in)
|
2020-04-27 13:06:21 -04:00
|
|
|
|
2021-12-02 14:29:16 -05:00
|
|
|
// Add all paths until done.
|
|
|
|
d.mu.Lock()
|
|
|
|
for i := range split {
|
|
|
|
d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
|
|
|
|
}
|
|
|
|
d.dirty = d.dirty || len(split) > 0
|
|
|
|
d.mu.Unlock()
|
2020-10-30 12:33:16 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-04-27 13:06:21 -04:00
|
|
|
|
2020-10-30 12:33:16 -04:00
|
|
|
// markDirty adds the supplied path to the current bloom filter.
|
2021-05-17 11:25:48 -04:00
|
|
|
func (d *dataUpdateTracker) markDirty(bucket, prefix string) {
|
2020-12-29 04:57:28 -05:00
|
|
|
dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
|
2021-05-17 11:25:48 -04:00
|
|
|
if bucket == "" && d.debug {
|
|
|
|
console.Debugf(dateUpdateTrackerLogPrefix + " no bucket specified\n")
|
2020-10-30 12:33:16 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-05-17 11:25:48 -04:00
|
|
|
if isReservedOrInvalidBucket(bucket, false) && d.debug {
|
2020-10-30 12:33:16 -04:00
|
|
|
return
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
2021-05-17 11:25:48 -04:00
|
|
|
split := splitPathDeterministic(pathJoin(bucket, prefix))
|
2020-10-30 12:33:16 -04:00
|
|
|
|
|
|
|
// Add all paths until done.
|
|
|
|
d.mu.Lock()
|
|
|
|
for i := range split {
|
|
|
|
d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
|
|
|
|
}
|
|
|
|
d.dirty = d.dirty || len(split) > 0
|
|
|
|
d.mu.Unlock()
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
// find entry with specified index.
|
|
|
|
// Returns nil if not found.
|
|
|
|
func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter {
|
|
|
|
for _, f := range d {
|
|
|
|
if f.idx == idx {
|
|
|
|
return &f
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// filterFrom will return a combined bloom filter.
|
|
|
|
func (d *dataUpdateTracker) filterFrom(ctx context.Context, oldest, newest uint64) *bloomFilterResponse {
|
|
|
|
bf := d.newBloomFilter()
|
|
|
|
bfr := bloomFilterResponse{
|
|
|
|
OldestIdx: oldest,
|
|
|
|
CurrentIdx: d.Current.idx,
|
|
|
|
Complete: true,
|
|
|
|
}
|
|
|
|
// Loop through each index requested.
|
|
|
|
for idx := oldest; idx <= newest; idx++ {
|
|
|
|
v := d.History.find(idx)
|
|
|
|
if v == nil {
|
|
|
|
if d.Current.idx == idx {
|
|
|
|
// Merge current.
|
|
|
|
err := bf.Merge(d.Current.bf.BloomFilter)
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
if err != nil {
|
|
|
|
bfr.Complete = false
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
bfr.Complete = false
|
|
|
|
bfr.OldestIdx = idx + 1
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
err := bf.Merge(v.bf.BloomFilter)
|
|
|
|
if err != nil {
|
|
|
|
bfr.Complete = false
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
bfr.NewestIdx = idx
|
|
|
|
}
|
|
|
|
var dst bytes.Buffer
|
|
|
|
_, err := bf.WriteTo(&dst)
|
|
|
|
if err != nil {
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
bfr.Filter = dst.Bytes()
|
|
|
|
|
|
|
|
return &bfr
|
|
|
|
}
|
|
|
|
|
|
|
|
// cycleFilter will cycle the bloom filter to start recording to index y if not already.
|
|
|
|
// The response will contain a bloom filter starting at index x up to, but not including index y.
|
|
|
|
// If y is 0, the response will not update y, but return the currently recorded information
|
2020-10-28 12:18:35 -04:00
|
|
|
// from the oldest (unless 0, then it will be all) until and including current y.
|
|
|
|
func (d *dataUpdateTracker) cycleFilter(ctx context.Context, req bloomFilterRequest) (*bloomFilterResponse, error) {
|
|
|
|
if req.OldestClean != "" {
|
|
|
|
return &bloomFilterResponse{OldestIdx: d.latestWithDir(req.OldestClean)}, nil
|
|
|
|
}
|
|
|
|
current := req.Current
|
|
|
|
oldest := req.Oldest
|
2020-04-27 13:06:21 -04:00
|
|
|
d.mu.Lock()
|
|
|
|
defer d.mu.Unlock()
|
|
|
|
if current == 0 {
|
|
|
|
if len(d.History) == 0 {
|
|
|
|
return d.filterFrom(ctx, d.Current.idx, d.Current.idx), nil
|
|
|
|
}
|
|
|
|
d.History.sort()
|
2020-10-28 12:18:35 -04:00
|
|
|
if oldest == 0 {
|
|
|
|
oldest = d.History[len(d.History)-1].idx
|
|
|
|
}
|
|
|
|
return d.filterFrom(ctx, oldest, d.Current.idx), nil
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
// Move current to history if new one requested
|
|
|
|
if d.Current.idx != current {
|
2020-05-15 00:46:36 -04:00
|
|
|
d.dirty = true
|
2020-04-27 13:06:21 -04:00
|
|
|
if d.debug {
|
2020-12-29 04:57:28 -05:00
|
|
|
console.Debugf(color.Green("dataUpdateTracker:")+" cycle bloom filter: %v -> %v\n", d.Current.idx, current)
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
d.History = append(d.History, d.Current)
|
|
|
|
d.Current.idx = current
|
|
|
|
d.Current.bf = d.newBloomFilter()
|
|
|
|
select {
|
|
|
|
case d.save <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
d.History.removeOlderThan(oldest)
|
|
|
|
return d.filterFrom(ctx, oldest, current), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// splitPathDeterministic will split the provided relative path
|
|
|
|
// deterministically and return up to the first 3 elements of the path.
|
2021-04-15 19:32:13 -04:00
|
|
|
// slash and dot prefixes are removed.
|
2020-04-27 13:06:21 -04:00
|
|
|
// Trailing slashes are removed.
|
|
|
|
// Returns 0 length if no parts are found after trimming.
|
|
|
|
func splitPathDeterministic(in string) []string {
|
2020-10-30 12:33:16 -04:00
|
|
|
split := strings.Split(decodeDirObject(in), SlashSeparator)
|
2020-04-27 13:06:21 -04:00
|
|
|
|
|
|
|
// Trim empty start/end
|
|
|
|
for len(split) > 0 {
|
|
|
|
if len(split[0]) > 0 && split[0] != "." {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
split = split[1:]
|
|
|
|
}
|
|
|
|
for len(split) > 0 {
|
|
|
|
if len(split[len(split)-1]) > 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
split = split[:len(split)-1]
|
|
|
|
}
|
|
|
|
|
|
|
|
return split
|
|
|
|
}
|
|
|
|
|
|
|
|
// bloomFilterRequest request bloom filters.
|
|
|
|
// Current index will be updated to current and entries back to Oldest is returned.
|
|
|
|
type bloomFilterRequest struct {
|
|
|
|
Oldest uint64
|
|
|
|
Current uint64
|
2020-10-28 12:18:35 -04:00
|
|
|
// If set the oldest clean version will be returned in OldestIdx
|
|
|
|
// and the rest of the request will be ignored.
|
|
|
|
OldestClean string
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
type bloomFilterResponse struct {
|
|
|
|
// Current index being written to.
|
|
|
|
CurrentIdx uint64
|
|
|
|
// Oldest index in the returned bloom filter.
|
|
|
|
OldestIdx uint64
|
|
|
|
// Newest Index in the returned bloom filter.
|
|
|
|
NewestIdx uint64
|
|
|
|
// Are all indexes between oldest and newest filled?
|
|
|
|
Complete bool
|
|
|
|
// Binary data of the bloom filter.
|
|
|
|
Filter []byte
|
|
|
|
}
|
|
|
|
|
2021-05-17 11:25:48 -04:00
|
|
|
// NSUpdated indicates namespace has been updated.
|
2020-10-30 12:33:16 -04:00
|
|
|
// The function will block until the entry has been picked up.
|
2021-05-17 11:25:48 -04:00
|
|
|
func NSUpdated(bucket, prefix string) {
|
2020-10-30 12:33:16 -04:00
|
|
|
if intDataUpdateTracker != nil {
|
2021-05-17 11:25:48 -04:00
|
|
|
intDataUpdateTracker.markDirty(bucket, prefix)
|
2020-04-27 13:06:21 -04:00
|
|
|
}
|
|
|
|
}
|