mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
use readConfig/saveConfig to simplify I/O on usage/tracker info (#14019)
This commit is contained in:
parent
9d91d32d82
commit
001b77e7e1
@ -120,7 +120,7 @@ func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string)
|
||||
logger.LogIf(ctx, errors.New("bucket name cannot be empty"))
|
||||
return errors.New("bucket name cannot be empty")
|
||||
}
|
||||
configFile := path.Join(bucketConfigPrefix, name, bucketMetadataFile)
|
||||
configFile := path.Join(bucketMetaPrefix, name, bucketMetadataFile)
|
||||
data, err := readConfig(ctx, api, configFile)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -277,7 +277,7 @@ func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI Obj
|
||||
}
|
||||
|
||||
for _, legacyFile := range legacyConfigs {
|
||||
configFile := path.Join(bucketConfigPrefix, b.Name, legacyFile)
|
||||
configFile := path.Join(bucketMetaPrefix, b.Name, legacyFile)
|
||||
|
||||
configData, err := readConfig(ctx, objectAPI, configFile)
|
||||
if err != nil {
|
||||
@ -338,7 +338,7 @@ func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI Obj
|
||||
}
|
||||
|
||||
for legacyFile := range configs {
|
||||
configFile := path.Join(bucketConfigPrefix, b.Name, legacyFile)
|
||||
configFile := path.Join(bucketMetaPrefix, b.Name, legacyFile)
|
||||
if err := deleteConfig(ctx, objectAPI, configFile); err != nil && !errors.Is(err, errConfigNotFound) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
@ -365,7 +365,7 @@ func (b *BucketMetadata) Save(ctx context.Context, api ObjectLayer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
configFile := path.Join(bucketConfigPrefix, b.Name, bucketMetadataFile)
|
||||
configFile := path.Join(bucketMetaPrefix, b.Name, bucketMetadataFile)
|
||||
return saveConfig(ctx, api, configFile, data)
|
||||
}
|
||||
|
||||
@ -377,7 +377,7 @@ func deleteBucketMetadata(ctx context.Context, obj objectDeleter, bucket string)
|
||||
bucketMetadataFile,
|
||||
}
|
||||
for _, metaFile := range metadataFiles {
|
||||
configFile := path.Join(bucketConfigPrefix, bucket, metaFile)
|
||||
configFile := path.Join(bucketMetaPrefix, bucket, metaFile)
|
||||
if err := deleteConfig(ctx, obj, configFile); err != nil && err != errConfigNotFound {
|
||||
return err
|
||||
}
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
bucketConfigPrefix = "buckets"
|
||||
bucketNotificationConfig = "notification.xml"
|
||||
)
|
||||
|
||||
|
@ -29,8 +29,8 @@ import (
|
||||
|
||||
var errConfigNotFound = errors.New("config file not found")
|
||||
|
||||
func readConfigWithMetadata(ctx context.Context, objAPI ObjectLayer, configFile string) ([]byte, ObjectInfo, error) {
|
||||
r, err := objAPI.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
func readConfigWithMetadata(ctx context.Context, store objectIO, configFile string) ([]byte, ObjectInfo, error) {
|
||||
r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
if err != nil {
|
||||
// Treat object not found as config not found.
|
||||
if isErrObjectNotFound(err) {
|
||||
@ -51,8 +51,8 @@ func readConfigWithMetadata(ctx context.Context, objAPI ObjectLayer, configFile
|
||||
return buf, r.ObjInfo, nil
|
||||
}
|
||||
|
||||
func readConfig(ctx context.Context, objAPI ObjectLayer, configFile string) ([]byte, error) {
|
||||
buf, _, err := readConfigWithMetadata(ctx, objAPI, configFile)
|
||||
func readConfig(ctx context.Context, store objectIO, configFile string) ([]byte, error) {
|
||||
buf, _, err := readConfigWithMetadata(ctx, store, configFile)
|
||||
return buf, err
|
||||
}
|
||||
|
||||
@ -70,13 +70,13 @@ func deleteConfig(ctx context.Context, objAPI objectDeleter, configFile string)
|
||||
return err
|
||||
}
|
||||
|
||||
func saveConfig(ctx context.Context, objAPI ObjectLayer, configFile string, data []byte) error {
|
||||
func saveConfig(ctx context.Context, store objectIO, configFile string, data []byte) error {
|
||||
hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = objAPI.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), ObjectOptions{MaxParity: true})
|
||||
_, err = store.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), ObjectOptions{MaxParity: true})
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"io/fs"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
@ -40,7 +39,6 @@ import (
|
||||
"github.com/minio/minio/internal/color"
|
||||
"github.com/minio/minio/internal/config/heal"
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/console"
|
||||
)
|
||||
@ -111,6 +109,9 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
|
||||
locker := objAPI.NewNSLock(minioMetaBucket, "scanner/runDataScanner.lock")
|
||||
lkctx, err := locker.GetLock(pctx, dataScannerLeaderLockTimeout)
|
||||
if err != nil {
|
||||
if intDataUpdateTracker.debug {
|
||||
logger.LogIf(pctx, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
ctx := lkctx.Context()
|
||||
@ -120,18 +121,11 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
|
||||
// Load current bloom cycle
|
||||
nextBloomCycle := intDataUpdateTracker.current() + 1
|
||||
|
||||
br, err := objAPI.GetObjectNInfo(ctx, dataUsageBucket, dataUsageBloomName, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
if err != nil {
|
||||
if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) {
|
||||
buf, _ := readConfig(ctx, objAPI, dataUsageBloomNamePath)
|
||||
if len(buf) >= 8 {
|
||||
if err = binary.Read(bytes.NewReader(buf), binary.LittleEndian, &nextBloomCycle); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
} else {
|
||||
if br.ObjInfo.Size == 8 {
|
||||
if err = binary.Read(br, binary.LittleEndian, &nextBloomCycle); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
br.Close()
|
||||
}
|
||||
|
||||
scannerTimer := time.NewTimer(scannerCycle.Get())
|
||||
@ -161,14 +155,7 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
|
||||
nextBloomCycle++
|
||||
var tmp [8]byte
|
||||
binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle)
|
||||
r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)))
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageBloomName, NewPutObjReader(r), ObjectOptions{})
|
||||
if !isErrBucketNotFound(err) {
|
||||
if err = saveConfig(ctx, objAPI, dataUsageBloomNamePath, tmp[:]); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
|
@ -921,6 +921,7 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string)
|
||||
// Abandon if more than 5 minutes, so we don't hold up scanner.
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
|
@ -18,13 +18,10 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
)
|
||||
|
||||
@ -33,8 +30,11 @@ const (
|
||||
dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix
|
||||
|
||||
dataUsageObjName = ".usage.json"
|
||||
dataUsageCacheName = ".usage-cache.bin"
|
||||
dataUsageObjNamePath = bucketMetaPrefix + SlashSeparator + dataUsageObjName
|
||||
dataUsageBloomName = ".bloomcycle.bin"
|
||||
dataUsageBloomNamePath = bucketMetaPrefix + SlashSeparator + dataUsageBloomName
|
||||
|
||||
dataUsageCacheName = ".usage-cache.bin"
|
||||
)
|
||||
|
||||
// storeDataUsageInBackend will store all objects sent on the gui channel until closed.
|
||||
@ -46,14 +46,7 @@ func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
size := int64(len(dataUsageJSON))
|
||||
r, err := hash.NewReader(bytes.NewReader(dataUsageJSON), size, "", "", size)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageObjName, NewPutObjReader(r), ObjectOptions{})
|
||||
if !isErrBucketNotFound(err) {
|
||||
if err = saveConfig(ctx, objAPI, dataUsageObjNamePath, dataUsageJSON); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
@ -95,18 +88,17 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
||||
}
|
||||
|
||||
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
|
||||
r, err := objAPI.GetObjectNInfo(ctx, dataUsageBucket, dataUsageObjName, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
buf, err := readConfig(ctx, objAPI, dataUsageObjNamePath)
|
||||
if err != nil {
|
||||
if isErrObjectNotFound(err) || isErrBucketNotFound(err) {
|
||||
return DataUsageInfo{}, nil
|
||||
}
|
||||
return DataUsageInfo{}, toObjectErr(err, dataUsageBucket, dataUsageObjName)
|
||||
return DataUsageInfo{}, toObjectErr(err, minioMetaBucket, dataUsageObjNamePath)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
var dataUsageInfo DataUsageInfo
|
||||
json := jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
if err = json.NewDecoder(r).Decode(&dataUsageInfo); err != nil {
|
||||
if err = json.Unmarshal(buf, &dataUsageInfo); err != nil {
|
||||
return DataUsageInfo{}, err
|
||||
}
|
||||
// For forward compatibility reasons, we need to add this code.
|
||||
|
@ -387,7 +387,7 @@ func TestHealCorrectQuorum(t *testing.T) {
|
||||
t.Fatalf("Failed to complete multipart upload - %v", err)
|
||||
}
|
||||
|
||||
cfgFile := pathJoin(bucketConfigPrefix, bucket, ".test.bin")
|
||||
cfgFile := pathJoin(bucketMetaPrefix, bucket, ".test.bin")
|
||||
if err = saveConfig(ctx, objLayer, cfgFile, data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -1565,7 +1565,7 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts
|
||||
}
|
||||
|
||||
// Attempt heal on the bucket metadata, ignore any failures
|
||||
defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketConfigPrefix, bucket, bucketMetadataFile), "", opts)
|
||||
defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, bucket, bucketMetadataFile), "", opts)
|
||||
|
||||
for _, pool := range z.serverPools {
|
||||
result, err := pool.HealBucket(ctx, bucket, opts)
|
||||
|
Loading…
Reference in New Issue
Block a user