mirror of
https://github.com/minio/minio.git
synced 2025-01-12 07:23:23 -05:00
9667a170de
Lower forced compaction to 250K entries. If there is more than 250K entries on the top level force compact it and log an error.
1563 lines
43 KiB
Go
1563 lines
43 KiB
Go
// Copyright (c) 2015-2023 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net/http"
|
|
"path"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/cespare/xxhash/v2"
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/klauspost/compress/zstd"
|
|
"github.com/minio/madmin-go/v3"
|
|
"github.com/minio/minio/internal/bucket/lifecycle"
|
|
"github.com/tinylib/msgp/msgp"
|
|
"github.com/valyala/bytebufferpool"
|
|
)
|
|
|
|
//go:generate msgp -file $GOFILE -unexported
|
|
|
|
// dataUsageHash is the hash type used.
|
|
type dataUsageHash string
|
|
|
|
// sizeHistogramV1 is size histogram V1, which has fewer intervals esp. between
|
|
// 1024B and 1MiB.
|
|
type sizeHistogramV1 [dataUsageBucketLenV1]uint64
|
|
|
|
// sizeHistogram is a size histogram.
|
|
type sizeHistogram [dataUsageBucketLen]uint64
|
|
|
|
// versionsHistogram is a histogram of number of versions in an object.
|
|
type versionsHistogram [dataUsageVersionLen]uint64
|
|
|
|
type dataUsageEntry struct {
|
|
Children dataUsageHashMap `msg:"ch"`
|
|
// These fields do no include any children.
|
|
Size int64 `msg:"sz"`
|
|
Objects uint64 `msg:"os"`
|
|
Versions uint64 `msg:"vs"` // Versions that are not delete markers.
|
|
DeleteMarkers uint64 `msg:"dms"`
|
|
ObjSizes sizeHistogram `msg:"szs"`
|
|
ObjVersions versionsHistogram `msg:"vh"`
|
|
ReplicationStats *replicationAllStats `msg:"rs,omitempty"`
|
|
AllTierStats *allTierStats `msg:"ats,omitempty"`
|
|
Compacted bool `msg:"c"`
|
|
}
|
|
|
|
// allTierStats is a collection of per-tier stats across all configured remote
|
|
// tiers.
|
|
type allTierStats struct {
|
|
Tiers map[string]tierStats `msg:"ts"`
|
|
}
|
|
|
|
func newAllTierStats() *allTierStats {
|
|
return &allTierStats{
|
|
Tiers: make(map[string]tierStats),
|
|
}
|
|
}
|
|
|
|
func (ats *allTierStats) addSizes(tiers map[string]tierStats) {
|
|
for tier, st := range tiers {
|
|
ats.Tiers[tier] = ats.Tiers[tier].add(st)
|
|
}
|
|
}
|
|
|
|
func (ats *allTierStats) merge(other *allTierStats) {
|
|
for tier, st := range other.Tiers {
|
|
ats.Tiers[tier] = ats.Tiers[tier].add(st)
|
|
}
|
|
}
|
|
|
|
func (ats *allTierStats) clone() *allTierStats {
|
|
if ats == nil {
|
|
return nil
|
|
}
|
|
dst := *ats
|
|
dst.Tiers = make(map[string]tierStats, len(ats.Tiers))
|
|
for tier, st := range ats.Tiers {
|
|
dst.Tiers[tier] = st
|
|
}
|
|
return &dst
|
|
}
|
|
|
|
func (ats *allTierStats) populateStats(stats map[string]madmin.TierStats) {
|
|
if ats == nil {
|
|
return
|
|
}
|
|
|
|
// Update stats for tiers as they become available.
|
|
for tier, st := range ats.Tiers {
|
|
stats[tier] = madmin.TierStats{
|
|
TotalSize: st.TotalSize,
|
|
NumVersions: st.NumVersions,
|
|
NumObjects: st.NumObjects,
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// tierStats holds per-tier stats of a remote tier.
|
|
type tierStats struct {
|
|
TotalSize uint64 `msg:"ts"`
|
|
NumVersions int `msg:"nv"`
|
|
NumObjects int `msg:"no"`
|
|
}
|
|
|
|
func (ts tierStats) add(u tierStats) tierStats {
|
|
return tierStats{
|
|
TotalSize: ts.TotalSize + u.TotalSize,
|
|
NumVersions: ts.NumVersions + u.NumVersions,
|
|
NumObjects: ts.NumObjects + u.NumObjects,
|
|
}
|
|
}
|
|
|
|
//msgp:tuple replicationStatsV1
|
|
type replicationStatsV1 struct {
|
|
PendingSize uint64
|
|
ReplicatedSize uint64
|
|
FailedSize uint64
|
|
ReplicaSize uint64
|
|
FailedCount uint64
|
|
PendingCount uint64
|
|
MissedThresholdSize uint64
|
|
AfterThresholdSize uint64
|
|
MissedThresholdCount uint64
|
|
AfterThresholdCount uint64
|
|
}
|
|
|
|
func (rsv1 replicationStatsV1) Empty() bool {
|
|
return rsv1.ReplicatedSize == 0 &&
|
|
rsv1.FailedSize == 0 &&
|
|
rsv1.FailedCount == 0
|
|
}
|
|
|
|
//msgp:tuple replicationStats
|
|
type replicationStats struct {
|
|
PendingSize uint64
|
|
ReplicatedSize uint64
|
|
FailedSize uint64
|
|
FailedCount uint64
|
|
PendingCount uint64
|
|
MissedThresholdSize uint64
|
|
AfterThresholdSize uint64
|
|
MissedThresholdCount uint64
|
|
AfterThresholdCount uint64
|
|
ReplicatedCount uint64
|
|
}
|
|
|
|
func (rs replicationStats) Empty() bool {
|
|
return rs.ReplicatedSize == 0 &&
|
|
rs.FailedSize == 0 &&
|
|
rs.FailedCount == 0
|
|
}
|
|
|
|
type replicationAllStats struct {
|
|
Targets map[string]replicationStats `msg:"t,omitempty"`
|
|
ReplicaSize uint64 `msg:"r,omitempty"`
|
|
ReplicaCount uint64 `msg:"rc,omitempty"`
|
|
}
|
|
|
|
//msgp:tuple replicationAllStatsV1
|
|
type replicationAllStatsV1 struct {
|
|
Targets map[string]replicationStats
|
|
ReplicaSize uint64 `msg:"ReplicaSize,omitempty"`
|
|
ReplicaCount uint64 `msg:"ReplicaCount,omitempty"`
|
|
}
|
|
|
|
// empty returns true if the replicationAllStats is empty (contains no entries).
|
|
func (r *replicationAllStats) empty() bool {
|
|
if r == nil {
|
|
return true
|
|
}
|
|
if r.ReplicaSize != 0 || r.ReplicaCount != 0 {
|
|
return false
|
|
}
|
|
for _, v := range r.Targets {
|
|
if !v.Empty() {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// clone creates a deep-copy clone.
|
|
func (r *replicationAllStats) clone() *replicationAllStats {
|
|
if r == nil {
|
|
return nil
|
|
}
|
|
|
|
// Shallow copy
|
|
dst := *r
|
|
|
|
// Copy individual targets.
|
|
dst.Targets = make(map[string]replicationStats, len(r.Targets))
|
|
for k, v := range r.Targets {
|
|
dst.Targets[k] = v
|
|
}
|
|
|
|
return &dst
|
|
}
|
|
|
|
//msgp:encode ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4 dataUsageEntryV5 dataUsageEntryV6 dataUsageEntryV7
|
|
//msgp:marshal ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4 dataUsageEntryV5 dataUsageEntryV6 dataUsageEntryV7
|
|
|
|
//msgp:tuple dataUsageEntryV2
|
|
type dataUsageEntryV2 struct {
|
|
// These fields do no include any children.
|
|
Size int64
|
|
Objects uint64
|
|
ObjSizes sizeHistogram
|
|
Children dataUsageHashMap
|
|
}
|
|
|
|
//msgp:tuple dataUsageEntryV3
|
|
type dataUsageEntryV3 struct {
|
|
// These fields do no include any children.
|
|
Size int64
|
|
ReplicatedSize uint64
|
|
ReplicationPendingSize uint64
|
|
ReplicationFailedSize uint64
|
|
ReplicaSize uint64
|
|
Objects uint64
|
|
ObjSizes sizeHistogram
|
|
Children dataUsageHashMap
|
|
}
|
|
|
|
//msgp:tuple dataUsageEntryV4
|
|
type dataUsageEntryV4 struct {
|
|
Children dataUsageHashMap
|
|
// These fields do no include any children.
|
|
Size int64
|
|
Objects uint64
|
|
ObjSizes sizeHistogram
|
|
ReplicationStats replicationStatsV1
|
|
}
|
|
|
|
//msgp:tuple dataUsageEntryV5
|
|
type dataUsageEntryV5 struct {
|
|
Children dataUsageHashMap
|
|
// These fields do no include any children.
|
|
Size int64
|
|
Objects uint64
|
|
Versions uint64 // Versions that are not delete markers.
|
|
ObjSizes sizeHistogram
|
|
ReplicationStats *replicationStatsV1
|
|
Compacted bool
|
|
}
|
|
|
|
//msgp:tuple dataUsageEntryV6
|
|
type dataUsageEntryV6 struct {
|
|
Children dataUsageHashMap
|
|
// These fields do no include any children.
|
|
Size int64
|
|
Objects uint64
|
|
Versions uint64 // Versions that are not delete markers.
|
|
ObjSizes sizeHistogram
|
|
ReplicationStats *replicationAllStatsV1
|
|
Compacted bool
|
|
}
|
|
|
|
type dataUsageEntryV7 struct {
|
|
Children dataUsageHashMap `msg:"ch"`
|
|
// These fields do no include any children.
|
|
Size int64 `msg:"sz"`
|
|
Objects uint64 `msg:"os"`
|
|
Versions uint64 `msg:"vs"` // Versions that are not delete markers.
|
|
DeleteMarkers uint64 `msg:"dms"`
|
|
ObjSizes sizeHistogramV1 `msg:"szs"`
|
|
ObjVersions versionsHistogram `msg:"vh"`
|
|
ReplicationStats *replicationAllStats `msg:"rs,omitempty"`
|
|
AllTierStats *allTierStats `msg:"ats,omitempty"`
|
|
Compacted bool `msg:"c"`
|
|
}
|
|
|
|
// dataUsageCache contains a cache of data usage entries latest version.
|
|
type dataUsageCache struct {
|
|
Info dataUsageCacheInfo
|
|
Cache map[string]dataUsageEntry
|
|
}
|
|
|
|
//msgp:encode ignore dataUsageCacheV2 dataUsageCacheV3 dataUsageCacheV4 dataUsageCacheV5 dataUsageCacheV6 dataUsageCacheV7
|
|
//msgp:marshal ignore dataUsageCacheV2 dataUsageCacheV3 dataUsageCacheV4 dataUsageCacheV5 dataUsageCacheV6 dataUsageCacheV7
|
|
|
|
// dataUsageCacheV2 contains a cache of data usage entries version 2.
|
|
type dataUsageCacheV2 struct {
|
|
Info dataUsageCacheInfo
|
|
Cache map[string]dataUsageEntryV2
|
|
}
|
|
|
|
// dataUsageCacheV3 contains a cache of data usage entries version 3.
|
|
type dataUsageCacheV3 struct {
|
|
Info dataUsageCacheInfo
|
|
Cache map[string]dataUsageEntryV3
|
|
}
|
|
|
|
// dataUsageCacheV4 contains a cache of data usage entries version 4.
|
|
type dataUsageCacheV4 struct {
|
|
Info dataUsageCacheInfo
|
|
Cache map[string]dataUsageEntryV4
|
|
}
|
|
|
|
// dataUsageCacheV5 contains a cache of data usage entries version 5.
|
|
type dataUsageCacheV5 struct {
|
|
Info dataUsageCacheInfo
|
|
Cache map[string]dataUsageEntryV5
|
|
}
|
|
|
|
// dataUsageCacheV6 contains a cache of data usage entries version 6.
|
|
type dataUsageCacheV6 struct {
|
|
Info dataUsageCacheInfo
|
|
Cache map[string]dataUsageEntryV6
|
|
}
|
|
|
|
// dataUsageCacheV7 contains a cache of data usage entries version 7.
|
|
type dataUsageCacheV7 struct {
|
|
Info dataUsageCacheInfo
|
|
Cache map[string]dataUsageEntryV7
|
|
}
|
|
|
|
//msgp:ignore dataUsageEntryInfo
|
|
type dataUsageEntryInfo struct {
|
|
Name string
|
|
Parent string
|
|
Entry dataUsageEntry
|
|
}
|
|
|
|
type dataUsageCacheInfo struct {
|
|
// Name of the bucket. Also root element.
|
|
Name string
|
|
NextCycle uint32
|
|
LastUpdate time.Time
|
|
// indicates if the disk is being healed and scanner
|
|
// should skip healing the disk
|
|
SkipHealing bool
|
|
|
|
// Active lifecycle, if any on the bucket
|
|
lifeCycle *lifecycle.Lifecycle `msg:"-"`
|
|
|
|
// optional updates channel.
|
|
// If set updates will be sent regularly to this channel.
|
|
// Will not be closed when returned.
|
|
updates chan<- dataUsageEntry `msg:"-"`
|
|
replication replicationConfig `msg:"-"`
|
|
}
|
|
|
|
func (e *dataUsageEntry) addSizes(summary sizeSummary) {
|
|
e.Size += summary.totalSize
|
|
e.Versions += summary.versions
|
|
e.DeleteMarkers += summary.deleteMarkers
|
|
e.ObjSizes.add(summary.totalSize)
|
|
e.ObjVersions.add(summary.versions)
|
|
|
|
if e.ReplicationStats == nil {
|
|
e.ReplicationStats = &replicationAllStats{
|
|
Targets: make(map[string]replicationStats),
|
|
}
|
|
} else if e.ReplicationStats.Targets == nil {
|
|
e.ReplicationStats.Targets = make(map[string]replicationStats)
|
|
}
|
|
e.ReplicationStats.ReplicaSize += uint64(summary.replicaSize)
|
|
e.ReplicationStats.ReplicaCount += uint64(summary.replicaCount)
|
|
|
|
for arn, st := range summary.replTargetStats {
|
|
tgtStat, ok := e.ReplicationStats.Targets[arn]
|
|
if !ok {
|
|
tgtStat = replicationStats{}
|
|
}
|
|
tgtStat.PendingSize += uint64(st.pendingSize)
|
|
tgtStat.FailedSize += uint64(st.failedSize)
|
|
tgtStat.ReplicatedSize += uint64(st.replicatedSize)
|
|
tgtStat.ReplicatedCount += uint64(st.replicatedCount)
|
|
tgtStat.FailedCount += st.failedCount
|
|
tgtStat.PendingCount += st.pendingCount
|
|
e.ReplicationStats.Targets[arn] = tgtStat
|
|
}
|
|
if len(summary.tiers) != 0 {
|
|
if e.AllTierStats == nil {
|
|
e.AllTierStats = newAllTierStats()
|
|
}
|
|
e.AllTierStats.addSizes(summary.tiers)
|
|
}
|
|
}
|
|
|
|
// merge other data usage entry into this, excluding children.
|
|
func (e *dataUsageEntry) merge(other dataUsageEntry) {
|
|
e.Objects += other.Objects
|
|
e.Versions += other.Versions
|
|
e.DeleteMarkers += other.DeleteMarkers
|
|
e.Size += other.Size
|
|
if other.ReplicationStats != nil {
|
|
if e.ReplicationStats == nil {
|
|
e.ReplicationStats = &replicationAllStats{Targets: make(map[string]replicationStats)}
|
|
} else if e.ReplicationStats.Targets == nil {
|
|
e.ReplicationStats.Targets = make(map[string]replicationStats)
|
|
}
|
|
e.ReplicationStats.ReplicaSize += other.ReplicationStats.ReplicaSize
|
|
e.ReplicationStats.ReplicaCount += other.ReplicationStats.ReplicaCount
|
|
for arn, stat := range other.ReplicationStats.Targets {
|
|
st := e.ReplicationStats.Targets[arn]
|
|
e.ReplicationStats.Targets[arn] = replicationStats{
|
|
PendingSize: stat.PendingSize + st.PendingSize,
|
|
FailedSize: stat.FailedSize + st.FailedSize,
|
|
ReplicatedSize: stat.ReplicatedSize + st.ReplicatedSize,
|
|
PendingCount: stat.PendingCount + st.PendingCount,
|
|
FailedCount: stat.FailedCount + st.FailedCount,
|
|
ReplicatedCount: stat.ReplicatedCount + st.ReplicatedCount,
|
|
}
|
|
}
|
|
}
|
|
|
|
for i, v := range other.ObjSizes[:] {
|
|
e.ObjSizes[i] += v
|
|
}
|
|
|
|
for i, v := range other.ObjVersions[:] {
|
|
e.ObjVersions[i] += v
|
|
}
|
|
|
|
if other.AllTierStats != nil && len(other.AllTierStats.Tiers) != 0 {
|
|
if e.AllTierStats == nil {
|
|
e.AllTierStats = newAllTierStats()
|
|
}
|
|
e.AllTierStats.merge(other.AllTierStats)
|
|
}
|
|
}
|
|
|
|
// mod returns true if the hash mod cycles == cycle.
|
|
// If cycles is 0 false is always returned.
|
|
// If cycles is 1 true is always returned (as expected).
|
|
func (h dataUsageHash) mod(cycle uint32, cycles uint32) bool {
|
|
if cycles <= 1 {
|
|
return cycles == 1
|
|
}
|
|
return uint32(xxhash.Sum64String(string(h)))%cycles == cycle%cycles
|
|
}
|
|
|
|
// modAlt returns true if the hash mod cycles == cycle.
|
|
// This is out of sync with mod.
|
|
// If cycles is 0 false is always returned.
|
|
// If cycles is 1 true is always returned (as expected).
|
|
func (h dataUsageHash) modAlt(cycle uint32, cycles uint32) bool {
|
|
if cycles <= 1 {
|
|
return cycles == 1
|
|
}
|
|
return uint32(xxhash.Sum64String(string(h))>>32)%(cycles) == cycle%cycles
|
|
}
|
|
|
|
// addChild will add a child based on its hash.
|
|
// If it already exists it will not be added again.
|
|
func (e *dataUsageEntry) addChild(hash dataUsageHash) {
|
|
if _, ok := e.Children[hash.Key()]; ok {
|
|
return
|
|
}
|
|
if e.Children == nil {
|
|
e.Children = make(dataUsageHashMap, 1)
|
|
}
|
|
e.Children[hash.Key()] = struct{}{}
|
|
}
|
|
|
|
// Create a clone of the entry.
|
|
func (e dataUsageEntry) clone() dataUsageEntry {
|
|
// We operate on a copy from the receiver.
|
|
if e.Children != nil {
|
|
ch := make(dataUsageHashMap, len(e.Children))
|
|
for k, v := range e.Children {
|
|
ch[k] = v
|
|
}
|
|
e.Children = ch
|
|
}
|
|
if e.ReplicationStats != nil {
|
|
// Clone ReplicationStats
|
|
e.ReplicationStats = e.ReplicationStats.clone()
|
|
}
|
|
if e.AllTierStats != nil {
|
|
e.AllTierStats = e.AllTierStats.clone()
|
|
}
|
|
return e
|
|
}
|
|
|
|
// find a path in the cache.
|
|
// Returns nil if not found.
|
|
func (d *dataUsageCache) find(path string) *dataUsageEntry {
|
|
due, ok := d.Cache[hashPath(path).Key()]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return &due
|
|
}
|
|
|
|
// isCompacted returns whether an entry is compacted.
|
|
// Returns false if not found.
|
|
func (d *dataUsageCache) isCompacted(h dataUsageHash) bool {
|
|
due, ok := d.Cache[h.Key()]
|
|
if !ok {
|
|
return false
|
|
}
|
|
return due.Compacted
|
|
}
|
|
|
|
// findChildrenCopy returns a copy of the children of the supplied hash.
|
|
func (d *dataUsageCache) findChildrenCopy(h dataUsageHash) dataUsageHashMap {
|
|
ch := d.Cache[h.String()].Children
|
|
res := make(dataUsageHashMap, len(ch))
|
|
for k := range ch {
|
|
res[k] = struct{}{}
|
|
}
|
|
return res
|
|
}
|
|
|
|
// searchParent will search for the parent of h.
|
|
// This is an O(N*N) operation if there is no parent or it cannot be guessed.
|
|
func (d *dataUsageCache) searchParent(h dataUsageHash) *dataUsageHash {
|
|
want := h.Key()
|
|
if idx := strings.LastIndexByte(want, '/'); idx >= 0 {
|
|
if v := d.find(want[:idx]); v != nil {
|
|
_, ok := v.Children[want]
|
|
if ok {
|
|
found := hashPath(want[:idx])
|
|
return &found
|
|
}
|
|
}
|
|
}
|
|
for k, v := range d.Cache {
|
|
_, ok := v.Children[want]
|
|
if ok {
|
|
found := dataUsageHash(k)
|
|
return &found
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// deleteRecursive will delete an entry recursively, but not change its parent.
|
|
func (d *dataUsageCache) deleteRecursive(h dataUsageHash) {
|
|
if existing, ok := d.Cache[h.String()]; ok {
|
|
// Delete first if there should be a loop.
|
|
delete(d.Cache, h.Key())
|
|
for child := range existing.Children {
|
|
d.deleteRecursive(dataUsageHash(child))
|
|
}
|
|
}
|
|
}
|
|
|
|
// dui converts the flattened version of the path to madmin.DataUsageInfo.
|
|
// As a side effect d will be flattened, use a clone if this is not ok.
|
|
func (d *dataUsageCache) dui(path string, buckets []BucketInfo) DataUsageInfo {
|
|
e := d.find(path)
|
|
if e == nil {
|
|
// No entry found, return empty.
|
|
return DataUsageInfo{}
|
|
}
|
|
flat := d.flatten(*e)
|
|
dui := DataUsageInfo{
|
|
LastUpdate: d.Info.LastUpdate,
|
|
ObjectsTotalCount: flat.Objects,
|
|
VersionsTotalCount: flat.Versions,
|
|
DeleteMarkersTotalCount: flat.DeleteMarkers,
|
|
ObjectsTotalSize: uint64(flat.Size),
|
|
BucketsCount: uint64(len(e.Children)),
|
|
BucketsUsage: d.bucketsUsageInfo(buckets),
|
|
TierStats: d.tiersUsageInfo(buckets),
|
|
}
|
|
return dui
|
|
}
|
|
|
|
// replace will add or replace an entry in the cache.
|
|
// If a parent is specified it will be added to that if not already there.
|
|
// If the parent does not exist, it will be added.
|
|
func (d *dataUsageCache) replace(path, parent string, e dataUsageEntry) {
|
|
hash := hashPath(path)
|
|
if d.Cache == nil {
|
|
d.Cache = make(map[string]dataUsageEntry, 100)
|
|
}
|
|
d.Cache[hash.Key()] = e
|
|
if parent != "" {
|
|
phash := hashPath(parent)
|
|
p := d.Cache[phash.Key()]
|
|
p.addChild(hash)
|
|
d.Cache[phash.Key()] = p
|
|
}
|
|
}
|
|
|
|
// replaceHashed add or replaces an entry to the cache based on its hash.
|
|
// If a parent is specified it will be added to that if not already there.
|
|
// If the parent does not exist, it will be added.
|
|
func (d *dataUsageCache) replaceHashed(hash dataUsageHash, parent *dataUsageHash, e dataUsageEntry) {
|
|
if d.Cache == nil {
|
|
d.Cache = make(map[string]dataUsageEntry, 100)
|
|
}
|
|
d.Cache[hash.Key()] = e
|
|
if parent != nil {
|
|
p := d.Cache[parent.Key()]
|
|
p.addChild(hash)
|
|
d.Cache[parent.Key()] = p
|
|
}
|
|
}
|
|
|
|
// copyWithChildren will copy entry with hash from src if it exists along with any children.
|
|
// If a parent is specified it will be added to that if not already there.
|
|
// If the parent does not exist, it will be added.
|
|
func (d *dataUsageCache) copyWithChildren(src *dataUsageCache, hash dataUsageHash, parent *dataUsageHash) {
|
|
if d.Cache == nil {
|
|
d.Cache = make(map[string]dataUsageEntry, 100)
|
|
}
|
|
e, ok := src.Cache[hash.String()]
|
|
if !ok {
|
|
return
|
|
}
|
|
d.Cache[hash.Key()] = e
|
|
for ch := range e.Children {
|
|
if ch == hash.Key() {
|
|
scannerLogIf(GlobalContext, errors.New("dataUsageCache.copyWithChildren: Circular reference"))
|
|
return
|
|
}
|
|
d.copyWithChildren(src, dataUsageHash(ch), &hash)
|
|
}
|
|
if parent != nil {
|
|
p := d.Cache[parent.Key()]
|
|
p.addChild(hash)
|
|
d.Cache[parent.Key()] = p
|
|
}
|
|
}
|
|
|
|
// reduceChildrenOf will reduce the recursive number of children to the limit
|
|
// by compacting the children with the least number of objects.
|
|
func (d *dataUsageCache) reduceChildrenOf(path dataUsageHash, limit int, compactSelf bool) {
|
|
e, ok := d.Cache[path.Key()]
|
|
if !ok {
|
|
return
|
|
}
|
|
if e.Compacted {
|
|
return
|
|
}
|
|
// If direct children have more, compact all.
|
|
if len(e.Children) > limit && compactSelf {
|
|
flat := d.sizeRecursive(path.Key())
|
|
flat.Compacted = true
|
|
d.deleteRecursive(path)
|
|
d.replaceHashed(path, nil, *flat)
|
|
return
|
|
}
|
|
total := d.totalChildrenRec(path.Key())
|
|
if total < limit {
|
|
return
|
|
}
|
|
|
|
// Appears to be printed with _MINIO_SERVER_DEBUG=off
|
|
// console.Debugf(" %d children found, compacting %v\n", total, path)
|
|
|
|
leaves := make([]struct {
|
|
objects uint64
|
|
path dataUsageHash
|
|
}, total)
|
|
// Collect current leaves that have children.
|
|
leaves = leaves[:0]
|
|
remove := total - limit
|
|
var add func(path dataUsageHash)
|
|
add = func(path dataUsageHash) {
|
|
e, ok := d.Cache[path.Key()]
|
|
if !ok {
|
|
return
|
|
}
|
|
if len(e.Children) == 0 {
|
|
return
|
|
}
|
|
sz := d.sizeRecursive(path.Key())
|
|
leaves = append(leaves, struct {
|
|
objects uint64
|
|
path dataUsageHash
|
|
}{objects: sz.Objects, path: path})
|
|
for ch := range e.Children {
|
|
add(dataUsageHash(ch))
|
|
}
|
|
}
|
|
|
|
// Add path recursively.
|
|
add(path)
|
|
sort.Slice(leaves, func(i, j int) bool {
|
|
return leaves[i].objects < leaves[j].objects
|
|
})
|
|
for remove > 0 && len(leaves) > 0 {
|
|
// Remove top entry.
|
|
e := leaves[0]
|
|
candidate := e.path
|
|
if candidate == path && !compactSelf {
|
|
// We should be the biggest,
|
|
// if we cannot compact ourself, we are done.
|
|
break
|
|
}
|
|
removing := d.totalChildrenRec(candidate.Key())
|
|
flat := d.sizeRecursive(candidate.Key())
|
|
if flat == nil {
|
|
leaves = leaves[1:]
|
|
continue
|
|
}
|
|
// Appears to be printed with _MINIO_SERVER_DEBUG=off
|
|
// console.Debugf("compacting %v, removing %d children\n", candidate, removing)
|
|
|
|
flat.Compacted = true
|
|
d.deleteRecursive(candidate)
|
|
d.replaceHashed(candidate, nil, *flat)
|
|
|
|
// Remove top entry and subtract removed children.
|
|
remove -= removing
|
|
leaves = leaves[1:]
|
|
}
|
|
}
|
|
|
|
// forceCompact will force compact the cache of the top entry.
|
|
// If the number of children is more than limit*100, it will compact self.
|
|
// When above the limit a cleanup will also be performed to remove any possible abandoned entries.
|
|
func (d *dataUsageCache) forceCompact(limit int) {
|
|
if d == nil || len(d.Cache) <= limit {
|
|
return
|
|
}
|
|
top := hashPath(d.Info.Name).Key()
|
|
topE := d.find(top)
|
|
if topE == nil {
|
|
scannerLogIf(GlobalContext, errors.New("forceCompact: root not found"))
|
|
return
|
|
}
|
|
// If off by 2 orders of magnitude, compact self and log error.
|
|
if len(topE.Children) > dataScannerForceCompactAtFolders {
|
|
// If we still have too many children, compact self.
|
|
scannerLogOnceIf(GlobalContext, fmt.Errorf("forceCompact: %q has %d children. Force compacting. Expect reduced scanner performance", d.Info.Name, len(topE.Children)), d.Info.Name)
|
|
d.reduceChildrenOf(hashPath(d.Info.Name), limit, true)
|
|
}
|
|
if len(d.Cache) <= limit {
|
|
return
|
|
}
|
|
|
|
// Check for abandoned entries.
|
|
found := make(map[string]struct{}, len(d.Cache))
|
|
|
|
// Mark all children recursively
|
|
var mark func(entry dataUsageEntry)
|
|
mark = func(entry dataUsageEntry) {
|
|
for k := range entry.Children {
|
|
found[k] = struct{}{}
|
|
if ch, ok := d.Cache[k]; ok {
|
|
mark(ch)
|
|
}
|
|
}
|
|
}
|
|
found[top] = struct{}{}
|
|
mark(*topE)
|
|
|
|
// Delete all entries not found.
|
|
for k := range d.Cache {
|
|
if _, ok := found[k]; !ok {
|
|
delete(d.Cache, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
// StringAll returns a detailed string representation of all entries in the cache.
|
|
func (d *dataUsageCache) StringAll() string {
|
|
// Remove bloom filter from print.
|
|
s := fmt.Sprintf("info:%+v\n", d.Info)
|
|
for k, v := range d.Cache {
|
|
s += fmt.Sprintf("\t%v: %+v\n", k, v)
|
|
}
|
|
return strings.TrimSpace(s)
|
|
}
|
|
|
|
// String returns a human readable representation of the string.
|
|
func (h dataUsageHash) String() string {
|
|
return string(h)
|
|
}
|
|
|
|
// Key returns the key.
|
|
func (h dataUsageHash) Key() string {
|
|
return string(h)
|
|
}
|
|
|
|
func (d *dataUsageCache) flattenChildrens(root dataUsageEntry) (m map[string]dataUsageEntry) {
|
|
m = make(map[string]dataUsageEntry)
|
|
for id := range root.Children {
|
|
e := d.Cache[id]
|
|
if len(e.Children) > 0 {
|
|
e = d.flatten(e)
|
|
}
|
|
m[id] = e
|
|
}
|
|
return m
|
|
}
|
|
|
|
// flatten all children of the root into the root element and return it.
|
|
func (d *dataUsageCache) flatten(root dataUsageEntry) dataUsageEntry {
|
|
for id := range root.Children {
|
|
e := d.Cache[id]
|
|
if len(e.Children) > 0 {
|
|
e = d.flatten(e)
|
|
}
|
|
root.merge(e)
|
|
}
|
|
root.Children = nil
|
|
return root
|
|
}
|
|
|
|
// add a size to the histogram.
|
|
func (h *sizeHistogram) add(size int64) {
|
|
// Fetch the histogram interval corresponding
|
|
// to the passed object size.
|
|
for i, interval := range ObjectsHistogramIntervals[:] {
|
|
if size >= interval.start && size <= interval.end {
|
|
h[i]++
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// mergeV1 is used to migrate data usage cache from sizeHistogramV1 to
|
|
// sizeHistogram
|
|
func (h *sizeHistogram) mergeV1(v sizeHistogramV1) {
|
|
var oidx, nidx int
|
|
for oidx < len(v) {
|
|
intOld, intNew := ObjectsHistogramIntervalsV1[oidx], ObjectsHistogramIntervals[nidx]
|
|
// skip intervals that aren't common to both histograms
|
|
if intOld.start != intNew.start || intOld.end != intNew.end {
|
|
nidx++
|
|
continue
|
|
}
|
|
h[nidx] += v[oidx]
|
|
oidx++
|
|
nidx++
|
|
}
|
|
}
|
|
|
|
// toMap returns the map to a map[string]uint64.
|
|
func (h *sizeHistogram) toMap() map[string]uint64 {
|
|
res := make(map[string]uint64, dataUsageBucketLen)
|
|
var splCount uint64
|
|
for i, count := range h {
|
|
szInt := ObjectsHistogramIntervals[i]
|
|
switch {
|
|
case humanize.KiByte == szInt.start && szInt.end == humanize.MiByte-1:
|
|
// spl interval: [1024B, 1MiB)
|
|
res[szInt.name] = splCount
|
|
case humanize.KiByte <= szInt.start && szInt.end <= humanize.MiByte-1:
|
|
// intervals that fall within the spl interval above; they
|
|
// appear earlier in this array of intervals, see
|
|
// ObjectsHistogramIntervals
|
|
splCount += count
|
|
fallthrough
|
|
default:
|
|
res[szInt.name] = count
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
// add a version count to the histogram.
|
|
func (h *versionsHistogram) add(versions uint64) {
|
|
// Fetch the histogram interval corresponding
|
|
// to the passed object size.
|
|
for i, interval := range ObjectsVersionCountIntervals[:] {
|
|
if versions >= uint64(interval.start) && versions <= uint64(interval.end) {
|
|
h[i]++
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// toMap returns the map to a map[string]uint64.
|
|
func (h *versionsHistogram) toMap() map[string]uint64 {
|
|
res := make(map[string]uint64, dataUsageVersionLen)
|
|
for i, count := range h {
|
|
res[ObjectsVersionCountIntervals[i].name] = count
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (d *dataUsageCache) tiersUsageInfo(buckets []BucketInfo) *allTierStats {
|
|
dst := newAllTierStats()
|
|
for _, bucket := range buckets {
|
|
e := d.find(bucket.Name)
|
|
if e == nil {
|
|
continue
|
|
}
|
|
flat := d.flatten(*e)
|
|
if flat.AllTierStats == nil {
|
|
continue
|
|
}
|
|
dst.merge(flat.AllTierStats)
|
|
}
|
|
if len(dst.Tiers) == 0 {
|
|
return nil
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// bucketsUsageInfo returns the buckets usage info as a map, with
|
|
// key as bucket name
|
|
func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]BucketUsageInfo {
|
|
dst := make(map[string]BucketUsageInfo, len(buckets))
|
|
for _, bucket := range buckets {
|
|
e := d.find(bucket.Name)
|
|
if e == nil {
|
|
continue
|
|
}
|
|
flat := d.flatten(*e)
|
|
bui := BucketUsageInfo{
|
|
Size: uint64(flat.Size),
|
|
VersionsCount: flat.Versions,
|
|
ObjectsCount: flat.Objects,
|
|
DeleteMarkersCount: flat.DeleteMarkers,
|
|
ObjectSizesHistogram: flat.ObjSizes.toMap(),
|
|
ObjectVersionsHistogram: flat.ObjVersions.toMap(),
|
|
}
|
|
if flat.ReplicationStats != nil {
|
|
bui.ReplicaSize = flat.ReplicationStats.ReplicaSize
|
|
bui.ReplicaCount = flat.ReplicationStats.ReplicaCount
|
|
|
|
bui.ReplicationInfo = make(map[string]BucketTargetUsageInfo, len(flat.ReplicationStats.Targets))
|
|
for arn, stat := range flat.ReplicationStats.Targets {
|
|
bui.ReplicationInfo[arn] = BucketTargetUsageInfo{
|
|
ReplicationPendingSize: stat.PendingSize,
|
|
ReplicatedSize: stat.ReplicatedSize,
|
|
ReplicationFailedSize: stat.FailedSize,
|
|
ReplicationPendingCount: stat.PendingCount,
|
|
ReplicationFailedCount: stat.FailedCount,
|
|
ReplicatedCount: stat.ReplicatedCount,
|
|
}
|
|
}
|
|
}
|
|
dst[bucket.Name] = bui
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// sizeRecursive returns the path as a flattened entry.
|
|
func (d *dataUsageCache) sizeRecursive(path string) *dataUsageEntry {
|
|
root := d.find(path)
|
|
if root == nil || len(root.Children) == 0 {
|
|
return root
|
|
}
|
|
flat := d.flatten(*root)
|
|
if flat.ReplicationStats.empty() {
|
|
flat.ReplicationStats = nil
|
|
}
|
|
return &flat
|
|
}
|
|
|
|
// totalChildrenRec returns the total number of children recorded.
|
|
func (d *dataUsageCache) totalChildrenRec(path string) int {
|
|
root := d.find(path)
|
|
if root == nil || len(root.Children) == 0 {
|
|
return 0
|
|
}
|
|
n := len(root.Children)
|
|
for ch := range root.Children {
|
|
n += d.totalChildrenRec(ch)
|
|
}
|
|
return n
|
|
}
|
|
|
|
// root returns the root of the cache.
|
|
func (d *dataUsageCache) root() *dataUsageEntry {
|
|
return d.find(d.Info.Name)
|
|
}
|
|
|
|
// rootHash returns the root of the cache.
|
|
func (d *dataUsageCache) rootHash() dataUsageHash {
|
|
return hashPath(d.Info.Name)
|
|
}
|
|
|
|
// clone returns a copy of the cache with no references to the existing.
|
|
func (d *dataUsageCache) clone() dataUsageCache {
|
|
clone := dataUsageCache{
|
|
Info: d.Info,
|
|
Cache: make(map[string]dataUsageEntry, len(d.Cache)),
|
|
}
|
|
for k, v := range d.Cache {
|
|
clone.Cache[k] = v.clone()
|
|
}
|
|
return clone
|
|
}
|
|
|
|
// merge root of other into d.
|
|
// children of root will be flattened before being merged.
|
|
// Last update time will be set to the last updated.
|
|
func (d *dataUsageCache) merge(other dataUsageCache) {
|
|
existingRoot := d.root()
|
|
otherRoot := other.root()
|
|
if existingRoot == nil && otherRoot == nil {
|
|
return
|
|
}
|
|
if otherRoot == nil {
|
|
return
|
|
}
|
|
if existingRoot == nil {
|
|
*d = other.clone()
|
|
return
|
|
}
|
|
if other.Info.LastUpdate.After(d.Info.LastUpdate) {
|
|
d.Info.LastUpdate = other.Info.LastUpdate
|
|
}
|
|
existingRoot.merge(*otherRoot)
|
|
eHash := d.rootHash()
|
|
for key := range otherRoot.Children {
|
|
entry := other.Cache[key]
|
|
flat := other.flatten(entry)
|
|
existing := d.Cache[key]
|
|
// If not found, merging simply adds.
|
|
existing.merge(flat)
|
|
d.replaceHashed(dataUsageHash(key), &eHash, existing)
|
|
}
|
|
}
|
|
|
|
type objectIO interface {
|
|
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (reader *GetObjectReader, err error)
|
|
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
|
}
|
|
|
|
// load the cache content with name from minioMetaBackgroundOpsBucket.
|
|
// Only backend errors are returned as errors.
|
|
// The loader is optimistic and has no locking, but tries 5 times before giving up.
|
|
// If the object is not found, a nil error with empty data usage cache is returned.
|
|
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
|
|
// By default, empty data usage cache
|
|
*d = dataUsageCache{}
|
|
|
|
load := func(name string, timeout time.Duration) (bool, error) {
|
|
// Abandon if more than time.Minute, so we don't hold up scanner.
|
|
// drive timeout by default is 2 minutes, we do not need to wait longer.
|
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
r, err := store.GetObjectNInfo(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, name), nil, http.Header{}, ObjectOptions{NoLock: true})
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case ObjectNotFound, BucketNotFound:
|
|
r, err = store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, ObjectOptions{NoLock: true})
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case ObjectNotFound, BucketNotFound:
|
|
return false, nil
|
|
case InsufficientReadQuorum, StorageErr:
|
|
return true, nil
|
|
}
|
|
return false, err
|
|
}
|
|
err = d.deserialize(r)
|
|
r.Close()
|
|
return err != nil, nil
|
|
case InsufficientReadQuorum, StorageErr:
|
|
return true, nil
|
|
}
|
|
return false, err
|
|
}
|
|
err = d.deserialize(r)
|
|
r.Close()
|
|
return err != nil, nil
|
|
}
|
|
|
|
// Caches are read+written without locks,
|
|
retries := 0
|
|
for retries < 5 {
|
|
retry, err := load(name, time.Minute)
|
|
if err != nil {
|
|
return toObjectErr(err, dataUsageBucket, name)
|
|
}
|
|
if !retry {
|
|
break
|
|
}
|
|
retry, err = load(name+".bkp", 30*time.Second)
|
|
if err == nil && !retry {
|
|
// Only return when we have valid data from the backup
|
|
break
|
|
}
|
|
retries++
|
|
time.Sleep(time.Duration(rand.Int63n(int64(time.Second))))
|
|
}
|
|
|
|
if retries == 5 {
|
|
scannerLogOnceIf(ctx, fmt.Errorf("maximum retry reached to load the data usage cache `%s`", name), "retry-loading-data-usage-cache")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Maximum running concurrent saves on server.
|
|
var maxConcurrentScannerSaves = make(chan struct{}, 4)
|
|
|
|
// save the content of the cache to minioMetaBackgroundOpsBucket with the provided name.
|
|
// Note that no locking is done when saving.
|
|
func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case maxConcurrentScannerSaves <- struct{}{}:
|
|
}
|
|
|
|
buf := bytebufferpool.Get()
|
|
defer func() {
|
|
<-maxConcurrentScannerSaves
|
|
buf.Reset()
|
|
bytebufferpool.Put(buf)
|
|
}()
|
|
|
|
if err := d.serializeTo(buf); err != nil {
|
|
return err
|
|
}
|
|
|
|
save := func(name string, timeout time.Duration) error {
|
|
// Abandon if more than a minute, so we don't hold up scanner.
|
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
return saveConfig(ctx, store, pathJoin(bucketMetaPrefix, name), buf.Bytes())
|
|
}
|
|
defer save(name+".bkp", 5*time.Second) // Keep a backup as well
|
|
|
|
// drive timeout by default is 2 minutes, we do not need to wait longer.
|
|
return save(name, time.Minute)
|
|
}
|
|
|
|
// dataUsageCacheVer indicates the cache version.
|
|
// Bumping the cache version will drop data from previous versions
|
|
// and write new data with the new version.
|
|
const (
|
|
dataUsageCacheVerCurrent = 8
|
|
dataUsageCacheVerV7 = 7
|
|
dataUsageCacheVerV6 = 6
|
|
dataUsageCacheVerV5 = 5
|
|
dataUsageCacheVerV4 = 4
|
|
dataUsageCacheVerV3 = 3
|
|
dataUsageCacheVerV2 = 2
|
|
dataUsageCacheVerV1 = 1
|
|
)
|
|
|
|
// serialize the contents of the cache.
|
|
func (d *dataUsageCache) serializeTo(dst io.Writer) error {
|
|
// Add version and compress.
|
|
_, err := dst.Write([]byte{dataUsageCacheVerCurrent})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
enc, err := zstd.NewWriter(dst,
|
|
zstd.WithEncoderLevel(zstd.SpeedFastest),
|
|
zstd.WithWindowSize(1<<20),
|
|
zstd.WithEncoderConcurrency(2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mEnc := msgp.NewWriter(enc)
|
|
err = d.EncodeMsg(mEnc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = mEnc.Flush()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = enc.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// deserialize the supplied byte slice into the cache.
|
|
func (d *dataUsageCache) deserialize(r io.Reader) error {
|
|
var b [1]byte
|
|
n, _ := r.Read(b[:])
|
|
if n != 1 {
|
|
return io.ErrUnexpectedEOF
|
|
}
|
|
ver := int(b[0])
|
|
switch ver {
|
|
case dataUsageCacheVerV1:
|
|
return errors.New("cache version deprecated (will autoupdate)")
|
|
case dataUsageCacheVerV2:
|
|
// Zstd compressed.
|
|
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer dec.Close()
|
|
|
|
dold := &dataUsageCacheV2{}
|
|
if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil {
|
|
return err
|
|
}
|
|
d.Info = dold.Info
|
|
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
|
|
for k, v := range dold.Cache {
|
|
d.Cache[k] = dataUsageEntry{
|
|
Size: v.Size,
|
|
Objects: v.Objects,
|
|
ObjSizes: v.ObjSizes,
|
|
Children: v.Children,
|
|
Compacted: len(v.Children) == 0 && k != d.Info.Name,
|
|
}
|
|
}
|
|
return nil
|
|
case dataUsageCacheVerV3:
|
|
// Zstd compressed.
|
|
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer dec.Close()
|
|
dold := &dataUsageCacheV3{}
|
|
if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil {
|
|
return err
|
|
}
|
|
d.Info = dold.Info
|
|
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
|
|
for k, v := range dold.Cache {
|
|
due := dataUsageEntry{
|
|
Size: v.Size,
|
|
Objects: v.Objects,
|
|
ObjSizes: v.ObjSizes,
|
|
Children: v.Children,
|
|
}
|
|
if v.ReplicatedSize > 0 || v.ReplicaSize > 0 || v.ReplicationFailedSize > 0 || v.ReplicationPendingSize > 0 {
|
|
cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name)
|
|
if cfg != nil && cfg.RoleArn != "" {
|
|
due.ReplicationStats = &replicationAllStats{
|
|
Targets: make(map[string]replicationStats),
|
|
}
|
|
due.ReplicationStats.ReplicaSize = v.ReplicaSize
|
|
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
|
|
ReplicatedSize: v.ReplicatedSize,
|
|
FailedSize: v.ReplicationFailedSize,
|
|
PendingSize: v.ReplicationPendingSize,
|
|
}
|
|
}
|
|
}
|
|
due.Compacted = len(due.Children) == 0 && k != d.Info.Name
|
|
|
|
d.Cache[k] = due
|
|
}
|
|
return nil
|
|
case dataUsageCacheVerV4:
|
|
// Zstd compressed.
|
|
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer dec.Close()
|
|
dold := &dataUsageCacheV4{}
|
|
if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil {
|
|
return err
|
|
}
|
|
d.Info = dold.Info
|
|
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
|
|
for k, v := range dold.Cache {
|
|
due := dataUsageEntry{
|
|
Size: v.Size,
|
|
Objects: v.Objects,
|
|
ObjSizes: v.ObjSizes,
|
|
Children: v.Children,
|
|
}
|
|
empty := replicationStatsV1{}
|
|
|
|
if v.ReplicationStats != empty {
|
|
cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name)
|
|
if cfg != nil && cfg.RoleArn != "" {
|
|
due.ReplicationStats = &replicationAllStats{
|
|
Targets: make(map[string]replicationStats),
|
|
}
|
|
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
|
|
ReplicatedSize: v.ReplicationStats.ReplicatedSize,
|
|
FailedSize: v.ReplicationStats.FailedSize,
|
|
FailedCount: v.ReplicationStats.FailedCount,
|
|
PendingSize: v.ReplicationStats.PendingSize,
|
|
PendingCount: v.ReplicationStats.PendingCount,
|
|
}
|
|
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
|
|
}
|
|
}
|
|
due.Compacted = len(due.Children) == 0 && k != d.Info.Name
|
|
|
|
d.Cache[k] = due
|
|
}
|
|
|
|
// Populate compacted value and remove unneeded replica stats.
|
|
for k, e := range d.Cache {
|
|
if e.ReplicationStats != nil && len(e.ReplicationStats.Targets) == 0 {
|
|
e.ReplicationStats = nil
|
|
}
|
|
d.Cache[k] = e
|
|
}
|
|
return nil
|
|
case dataUsageCacheVerV5:
|
|
// Zstd compressed.
|
|
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer dec.Close()
|
|
dold := &dataUsageCacheV5{}
|
|
if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil {
|
|
return err
|
|
}
|
|
d.Info = dold.Info
|
|
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
|
|
for k, v := range dold.Cache {
|
|
due := dataUsageEntry{
|
|
Size: v.Size,
|
|
Objects: v.Objects,
|
|
ObjSizes: v.ObjSizes,
|
|
Children: v.Children,
|
|
}
|
|
if v.ReplicationStats != nil && !v.ReplicationStats.Empty() {
|
|
cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name)
|
|
if cfg != nil && cfg.RoleArn != "" {
|
|
due.ReplicationStats = &replicationAllStats{
|
|
Targets: make(map[string]replicationStats),
|
|
}
|
|
d.Info.replication = replicationConfig{Config: cfg}
|
|
|
|
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
|
|
ReplicatedSize: v.ReplicationStats.ReplicatedSize,
|
|
FailedSize: v.ReplicationStats.FailedSize,
|
|
FailedCount: v.ReplicationStats.FailedCount,
|
|
PendingSize: v.ReplicationStats.PendingSize,
|
|
PendingCount: v.ReplicationStats.PendingCount,
|
|
}
|
|
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
|
|
}
|
|
}
|
|
due.Compacted = len(due.Children) == 0 && k != d.Info.Name
|
|
|
|
d.Cache[k] = due
|
|
}
|
|
|
|
// Populate compacted value and remove unneeded replica stats.
|
|
for k, e := range d.Cache {
|
|
if e.ReplicationStats != nil && len(e.ReplicationStats.Targets) == 0 {
|
|
e.ReplicationStats = nil
|
|
}
|
|
d.Cache[k] = e
|
|
}
|
|
return nil
|
|
case dataUsageCacheVerV6:
|
|
// Zstd compressed.
|
|
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer dec.Close()
|
|
dold := &dataUsageCacheV6{}
|
|
if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil {
|
|
return err
|
|
}
|
|
d.Info = dold.Info
|
|
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
|
|
for k, v := range dold.Cache {
|
|
var replicationStats *replicationAllStats
|
|
if v.ReplicationStats != nil {
|
|
replicationStats = &replicationAllStats{
|
|
Targets: v.ReplicationStats.Targets,
|
|
ReplicaSize: v.ReplicationStats.ReplicaSize,
|
|
ReplicaCount: v.ReplicationStats.ReplicaCount,
|
|
}
|
|
}
|
|
due := dataUsageEntry{
|
|
Children: v.Children,
|
|
Size: v.Size,
|
|
Objects: v.Objects,
|
|
Versions: v.Versions,
|
|
ObjSizes: v.ObjSizes,
|
|
ReplicationStats: replicationStats,
|
|
Compacted: v.Compacted,
|
|
}
|
|
d.Cache[k] = due
|
|
}
|
|
return nil
|
|
case dataUsageCacheVerV7:
|
|
// Zstd compressed.
|
|
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer dec.Close()
|
|
dold := &dataUsageCacheV7{}
|
|
if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil {
|
|
return err
|
|
}
|
|
d.Info = dold.Info
|
|
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
|
|
for k, v := range dold.Cache {
|
|
var szHist sizeHistogram
|
|
szHist.mergeV1(v.ObjSizes)
|
|
d.Cache[k] = dataUsageEntry{
|
|
Children: v.Children,
|
|
Size: v.Size,
|
|
Objects: v.Objects,
|
|
Versions: v.Versions,
|
|
ObjSizes: szHist,
|
|
ReplicationStats: v.ReplicationStats,
|
|
Compacted: v.Compacted,
|
|
}
|
|
}
|
|
|
|
return nil
|
|
case dataUsageCacheVerCurrent:
|
|
// Zstd compressed.
|
|
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer dec.Close()
|
|
return d.DecodeMsg(msgp.NewReader(dec))
|
|
default:
|
|
return fmt.Errorf("dataUsageCache: unknown version: %d", ver)
|
|
}
|
|
}
|
|
|
|
// Trim this from start+end of hashes.
|
|
var hashPathCutSet = dataUsageRoot
|
|
|
|
func init() {
|
|
if dataUsageRoot != string(filepath.Separator) {
|
|
hashPathCutSet = dataUsageRoot + string(filepath.Separator)
|
|
}
|
|
}
|
|
|
|
// hashPath calculates a hash of the provided string.
|
|
func hashPath(data string) dataUsageHash {
|
|
if data != dataUsageRoot {
|
|
data = strings.Trim(data, hashPathCutSet)
|
|
}
|
|
return dataUsageHash(path.Clean(data))
|
|
}
|
|
|
|
//msgp:ignore dataUsageHashMap
|
|
type dataUsageHashMap map[string]struct{}
|
|
|
|
// DecodeMsg implements msgp.Decodable
|
|
func (z *dataUsageHashMap) DecodeMsg(dc *msgp.Reader) (err error) {
|
|
var zb0002 uint32
|
|
zb0002, err = dc.ReadArrayHeader()
|
|
if err != nil {
|
|
err = msgp.WrapError(err)
|
|
return
|
|
}
|
|
if zb0002 == 0 {
|
|
*z = nil
|
|
return
|
|
}
|
|
*z = make(dataUsageHashMap, zb0002)
|
|
for i := uint32(0); i < zb0002; i++ {
|
|
{
|
|
var zb0003 string
|
|
zb0003, err = dc.ReadString()
|
|
if err != nil {
|
|
err = msgp.WrapError(err)
|
|
return
|
|
}
|
|
(*z)[zb0003] = struct{}{}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// EncodeMsg implements msgp.Encodable
|
|
func (z dataUsageHashMap) EncodeMsg(en *msgp.Writer) (err error) {
|
|
err = en.WriteArrayHeader(uint32(len(z)))
|
|
if err != nil {
|
|
err = msgp.WrapError(err)
|
|
return
|
|
}
|
|
for zb0004 := range z {
|
|
err = en.WriteString(zb0004)
|
|
if err != nil {
|
|
err = msgp.WrapError(err, zb0004)
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// MarshalMsg implements msgp.Marshaler
|
|
func (z dataUsageHashMap) MarshalMsg(b []byte) (o []byte, err error) {
|
|
o = msgp.Require(b, z.Msgsize())
|
|
o = msgp.AppendArrayHeader(o, uint32(len(z)))
|
|
for zb0004 := range z {
|
|
o = msgp.AppendString(o, zb0004)
|
|
}
|
|
return
|
|
}
|
|
|
|
// UnmarshalMsg implements msgp.Unmarshaler
|
|
func (z *dataUsageHashMap) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
|
var zb0002 uint32
|
|
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
|
|
if err != nil {
|
|
err = msgp.WrapError(err)
|
|
return
|
|
}
|
|
if zb0002 == 0 {
|
|
*z = nil
|
|
return bts, nil
|
|
}
|
|
*z = make(dataUsageHashMap, zb0002)
|
|
for i := uint32(0); i < zb0002; i++ {
|
|
{
|
|
var zb0003 string
|
|
zb0003, bts, err = msgp.ReadStringBytes(bts)
|
|
if err != nil {
|
|
err = msgp.WrapError(err)
|
|
return
|
|
}
|
|
(*z)[zb0003] = struct{}{}
|
|
}
|
|
}
|
|
o = bts
|
|
return
|
|
}
|
|
|
|
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
|
func (z dataUsageHashMap) Msgsize() (s int) {
|
|
s = msgp.ArrayHeaderSize
|
|
for zb0004 := range z {
|
|
s += msgp.StringPrefixSize + len(zb0004)
|
|
}
|
|
return
|
|
}
|
|
|
|
//msgp:encode ignore currentScannerCycle
|
|
//msgp:decode ignore currentScannerCycle
|
|
|
|
type currentScannerCycle struct {
|
|
current uint64
|
|
next uint64
|
|
started time.Time
|
|
cycleCompleted []time.Time
|
|
}
|
|
|
|
// clone returns a clone.
|
|
func (z currentScannerCycle) clone() currentScannerCycle {
|
|
z.cycleCompleted = append(make([]time.Time, 0, len(z.cycleCompleted)), z.cycleCompleted...)
|
|
return z
|
|
}
|