minio/cmd/data-usage.go

167 lines
5.6 KiB
Go
Raw Normal View History

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"errors"
"strings"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/internal/cachevalue"
"github.com/minio/minio/internal/logger"
)
const (
dataUsageRoot = SlashSeparator
dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix
dataUsageObjName = ".usage.json"
dataUsageObjNamePath = bucketMetaPrefix + SlashSeparator + dataUsageObjName
dataUsageBloomName = ".bloomcycle.bin"
dataUsageBloomNamePath = bucketMetaPrefix + SlashSeparator + dataUsageBloomName
backgroundHealInfoPath = bucketMetaPrefix + SlashSeparator + ".background-heal.json"
dataUsageCacheName = ".usage-cache.bin"
)
// storeDataUsageInBackend will store all objects sent on the dui channel until closed.
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan DataUsageInfo) {
attempts := 1
for dataUsageInfo := range dui {
json := jsoniter.ConfigCompatibleWithStandardLibrary
dataUsageJSON, err := json.Marshal(dataUsageInfo)
if err != nil {
logger.LogIf(ctx, err)
continue
}
if attempts > 10 {
saveConfig(ctx, objAPI, dataUsageObjNamePath+".bkp", dataUsageJSON) // Save a backup every 10th update.
attempts = 1
}
if err = saveConfig(ctx, objAPI, dataUsageObjNamePath, dataUsageJSON); err != nil {
logger.LogOnceIf(ctx, err, dataUsageObjNamePath)
}
attempts++
}
}
var prefixUsageCache = cachevalue.New[map[string]uint64]()
// loadPrefixUsageFromBackend returns prefix usages found in passed buckets
//
// e.g.: /testbucket/prefix => 355601334
func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket string) (map[string]uint64, error) {
z, ok := objAPI.(*erasureServerPools)
if !ok {
// Prefix usage is empty
return map[string]uint64{}, nil
}
cache := dataUsageCache{}
prefixUsageCache.InitOnce(30*time.Second,
// No need to fail upon Update() error, fallback to old value.
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
func() (map[string]uint64, error) {
m := make(map[string]uint64)
for _, pool := range z.serverPools {
for _, er := range pool.sets {
// Load bucket usage prefixes
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
ok := cache.load(ctx, er, bucket+slashSeparator+dataUsageCacheName) == nil
done()
if ok {
root := cache.find(bucket)
if root == nil {
// We dont have usage information for this bucket in this
// set, go to the next set
continue
}
for id, usageInfo := range cache.flattenChildrens(*root) {
prefix := decodeDirObject(strings.TrimPrefix(id, bucket+slashSeparator))
// decodeDirObject to avoid any __XLDIR__ objects
m[prefix] += uint64(usageInfo.Size)
}
}
}
}
return m, nil
},
)
return prefixUsageCache.Get()
}
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
buf, err := readConfig(ctx, objAPI, dataUsageObjNamePath)
if err != nil {
buf, err = readConfig(ctx, objAPI, dataUsageObjNamePath+".bkp")
if err != nil {
if errors.Is(err, errConfigNotFound) {
return DataUsageInfo{}, nil
}
return DataUsageInfo{}, toObjectErr(err, minioMetaBucket, dataUsageObjNamePath)
}
}
var dataUsageInfo DataUsageInfo
json := jsoniter.ConfigCompatibleWithStandardLibrary
if err = json.Unmarshal(buf, &dataUsageInfo); err != nil {
return DataUsageInfo{}, err
}
// For forward compatibility reasons, we need to add this code.
if len(dataUsageInfo.BucketsUsage) == 0 {
dataUsageInfo.BucketsUsage = make(map[string]BucketUsageInfo, len(dataUsageInfo.BucketSizes))
for bucket, size := range dataUsageInfo.BucketSizes {
dataUsageInfo.BucketsUsage[bucket] = BucketUsageInfo{Size: size}
}
}
// For backward compatibility reasons, we need to add this code.
if len(dataUsageInfo.BucketSizes) == 0 {
dataUsageInfo.BucketSizes = make(map[string]uint64, len(dataUsageInfo.BucketsUsage))
for bucket, bui := range dataUsageInfo.BucketsUsage {
dataUsageInfo.BucketSizes[bucket] = bui.Size
}
}
// For forward compatibility reasons, we need to add this code.
for bucket, bui := range dataUsageInfo.BucketsUsage {
if bui.ReplicatedSizeV1 > 0 || bui.ReplicationFailedCountV1 > 0 ||
bui.ReplicationFailedSizeV1 > 0 || bui.ReplicationPendingCountV1 > 0 {
cfg, _ := getReplicationConfig(GlobalContext, bucket)
if cfg != nil && cfg.RoleArn != "" {
if dataUsageInfo.ReplicationInfo == nil {
dataUsageInfo.ReplicationInfo = make(map[string]BucketTargetUsageInfo)
}
dataUsageInfo.ReplicationInfo[cfg.RoleArn] = BucketTargetUsageInfo{
ReplicationFailedSize: bui.ReplicationFailedSizeV1,
ReplicationFailedCount: bui.ReplicationFailedCountV1,
ReplicatedSize: bui.ReplicatedSizeV1,
ReplicationPendingCount: bui.ReplicationPendingCountV1,
ReplicationPendingSize: bui.ReplicationPendingSizeV1,
}
}
}
}
return dataUsageInfo, nil
}