mirror of
https://github.com/minio/minio.git
synced 2025-11-20 18:06:10 -05:00
Add data usage collect with its new admin API (#8553)
Admin data usage info API returns the following (Only FS & XL, for now) - Number of buckets - Number of objects - The total size of objects - Objects histogram - Bucket sizes
This commit is contained in:
248
cmd/fs-v1.go
248
cmd/fs-v1.go
@@ -19,11 +19,13 @@ package cmd
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -50,6 +52,11 @@ type FSObjects struct {
|
||||
// Disk usage metrics
|
||||
totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
||||
|
||||
// The count of concurrent calls on FSObjects API
|
||||
activeIOCount int64
|
||||
// The active IO count ceiling for crawling to work
|
||||
maxActiveIOCount int64
|
||||
|
||||
// Path to be exported over S3 API.
|
||||
fsPath string
|
||||
// meta json filename, varies by fs / cache backend.
|
||||
@@ -100,6 +107,11 @@ func initMetaVolumeFS(fsPath, fsUUID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
metaStatsPath := pathJoin(fsPath, minioMetaBackgroundOpsBucket, fsUUID)
|
||||
if err := os.MkdirAll(metaStatsPath, 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metaMultipartPath := pathJoin(fsPath, minioMetaMultipartBucket)
|
||||
return os.MkdirAll(metaMultipartPath, 0777)
|
||||
|
||||
@@ -147,6 +159,8 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
|
||||
listPool: NewTreeWalkPool(globalLookupTimeout),
|
||||
appendFileMap: make(map[string]*fsAppendFile),
|
||||
diskMount: mountinfo.IsLikelyMountPoint(fsPath),
|
||||
|
||||
maxActiveIOCount: 10,
|
||||
}
|
||||
|
||||
// Once the filesystem has initialized hold the read lock for
|
||||
@@ -155,10 +169,6 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
|
||||
// or cause changes on backend format.
|
||||
fs.fsFormatRlk = rlk
|
||||
|
||||
if !fs.diskMount {
|
||||
go fs.diskUsage(GlobalServiceDoneCh)
|
||||
}
|
||||
|
||||
go fs.cleanupStaleMultipartUploads(ctx, GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh)
|
||||
|
||||
// Return successfully initialized object layer.
|
||||
@@ -179,77 +189,14 @@ func (fs *FSObjects) Shutdown(ctx context.Context) error {
|
||||
return fsRemoveAll(ctx, pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID))
|
||||
}
|
||||
|
||||
// diskUsage returns du information for the posix path, in a continuous routine.
|
||||
func (fs *FSObjects) diskUsage(doneCh chan struct{}) {
|
||||
usageFn := func(ctx context.Context, entry string) error {
|
||||
if httpServer := newHTTPServerFn(); httpServer != nil {
|
||||
// Wait at max 1 minute for an inprogress request
|
||||
// before proceeding to count the usage.
|
||||
waitCount := 60
|
||||
// Any requests in progress, delay the usage.
|
||||
for httpServer.GetRequestCount() > 0 && waitCount > 0 {
|
||||
waitCount--
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-doneCh:
|
||||
return errWalkAbort
|
||||
default:
|
||||
fi, err := os.Stat(entry)
|
||||
if err != nil {
|
||||
err = osErrToFSFileErr(err)
|
||||
return err
|
||||
}
|
||||
atomic.AddUint64(&fs.totalUsed, uint64(fi.Size()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Return this routine upon errWalkAbort, continue for any other error on purpose
|
||||
// so that we can start the routine freshly in another 12 hours.
|
||||
if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err == errWalkAbort {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-time.After(globalUsageCheckInterval):
|
||||
var usage uint64
|
||||
usageFn = func(ctx context.Context, entry string) error {
|
||||
if httpServer := newHTTPServerFn(); httpServer != nil {
|
||||
// Wait at max 1 minute for an inprogress request
|
||||
// before proceeding to count the usage.
|
||||
waitCount := 60
|
||||
// Any requests in progress, delay the usage.
|
||||
for httpServer.GetRequestCount() > 0 && waitCount > 0 {
|
||||
waitCount--
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
fi, err := os.Stat(entry)
|
||||
if err != nil {
|
||||
err = osErrToFSFileErr(err)
|
||||
return err
|
||||
}
|
||||
usage = usage + uint64(fi.Size())
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil {
|
||||
continue
|
||||
}
|
||||
atomic.StoreUint64(&fs.totalUsed, usage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StorageInfo - returns underlying storage statistics.
|
||||
func (fs *FSObjects) StorageInfo(ctx context.Context) StorageInfo {
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
di, err := getDiskInfo(fs.fsPath)
|
||||
if err != nil {
|
||||
return StorageInfo{}
|
||||
@@ -258,6 +205,7 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) StorageInfo {
|
||||
if !fs.diskMount {
|
||||
used = atomic.LoadUint64(&fs.totalUsed)
|
||||
}
|
||||
|
||||
storageInfo := StorageInfo{
|
||||
Used: []uint64{used},
|
||||
Total: []uint64{di.Total},
|
||||
@@ -268,6 +216,98 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) StorageInfo {
|
||||
return storageInfo
|
||||
}
|
||||
|
||||
func (fs *FSObjects) waitForLowActiveIO() error {
|
||||
t := time.NewTicker(lowActiveIOWaitTick)
|
||||
defer t.Stop()
|
||||
for {
|
||||
if atomic.LoadInt64(&fs.activeIOCount) >= fs.maxActiveIOCount {
|
||||
select {
|
||||
case <-GlobalServiceDoneCh:
|
||||
return errors.New("forced exit")
|
||||
case <-t.C:
|
||||
continue
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// crawlAndGetDataUsageInfo returns data usage stats of the current FS deployment
|
||||
func (fs *FSObjects) crawlAndGetDataUsageInfo(ctx context.Context, endCh <-chan struct{}) DataUsageInfo {
|
||||
|
||||
var dataUsageInfoMu sync.Mutex
|
||||
var dataUsageInfo = DataUsageInfo{
|
||||
BucketsSizes: make(map[string]uint64),
|
||||
ObjectsSizesHistogram: make(map[string]uint64),
|
||||
}
|
||||
|
||||
walkFn := func(origPath string, typ os.FileMode) error {
|
||||
|
||||
select {
|
||||
case <-GlobalServiceDoneCh:
|
||||
return filepath.SkipDir
|
||||
default:
|
||||
}
|
||||
|
||||
if err := fs.waitForLowActiveIO(); err != nil {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
|
||||
path := strings.TrimPrefix(origPath, fs.fsPath)
|
||||
path = strings.TrimPrefix(path, SlashSeparator)
|
||||
|
||||
splits := splitN(path, SlashSeparator, 2)
|
||||
bucket := splits[0]
|
||||
prefix := splits[1]
|
||||
|
||||
if bucket == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if isReservedOrInvalidBucket(bucket, false) {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
|
||||
if prefix == "" {
|
||||
dataUsageInfoMu.Lock()
|
||||
dataUsageInfo.BucketsCount++
|
||||
dataUsageInfo.BucketsSizes[bucket] = 0
|
||||
dataUsageInfoMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
if typ&os.ModeDir != 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get file size
|
||||
fi, err := os.Stat(origPath)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
size := fi.Size()
|
||||
|
||||
dataUsageInfoMu.Lock()
|
||||
dataUsageInfo.ObjectsCount++
|
||||
dataUsageInfo.ObjectsTotalSize += uint64(size)
|
||||
dataUsageInfo.BucketsSizes[bucket] += uint64(size)
|
||||
dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(size))]++
|
||||
dataUsageInfoMu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
fastWalk(fs.fsPath, walkFn)
|
||||
|
||||
dataUsageInfo.LastUpdate = UTCNow()
|
||||
atomic.StoreUint64(&fs.totalUsed, dataUsageInfo.ObjectsTotalSize)
|
||||
|
||||
return dataUsageInfo
|
||||
}
|
||||
|
||||
/// Bucket operations
|
||||
|
||||
// getBucketDir - will convert incoming bucket names to
|
||||
@@ -305,6 +345,12 @@ func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket, locatio
|
||||
if s3utils.CheckValidBucketNameStrict(bucket) != nil {
|
||||
return BucketNameInvalid{Bucket: bucket}
|
||||
}
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
bucketDir, err := fs.getBucketDir(ctx, bucket)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket)
|
||||
@@ -324,6 +370,12 @@ func (fs *FSObjects) GetBucketInfo(ctx context.Context, bucket string) (bi Bucke
|
||||
return bi, e
|
||||
}
|
||||
defer bucketLock.RUnlock()
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
st, err := fs.statBucketDir(ctx, bucket)
|
||||
if err != nil {
|
||||
return bi, toObjectErr(err, bucket)
|
||||
@@ -343,6 +395,12 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
var bucketInfos []BucketInfo
|
||||
entries, err := readDir((fs.fsPath))
|
||||
if err != nil {
|
||||
@@ -388,6 +446,12 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
return err
|
||||
}
|
||||
defer bucketLock.Unlock()
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
bucketDir, err := fs.getBucketDir(ctx, bucket)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket)
|
||||
@@ -425,6 +489,11 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
|
||||
defer objectDWLock.Unlock()
|
||||
}
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
if _, err := fs.statBucketDir(ctx, srcBucket); err != nil {
|
||||
return oi, toObjectErr(err, srcBucket)
|
||||
}
|
||||
@@ -482,6 +551,11 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
if _, err = fs.statBucketDir(ctx, bucket); err != nil {
|
||||
return nil, toObjectErr(err, bucket)
|
||||
}
|
||||
@@ -583,6 +657,12 @@ func (fs *FSObjects) GetObject(ctx context.Context, bucket, object string, offse
|
||||
return err
|
||||
}
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
return fs.getObject(ctx, bucket, object, offset, length, writer, etag, true)
|
||||
}
|
||||
|
||||
@@ -768,6 +848,12 @@ func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object s
|
||||
|
||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||
func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error) {
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
|
||||
if err == errCorruptedFormat || err == io.EOF {
|
||||
objectLock := fs.NewNSLock(ctx, bucket, object)
|
||||
@@ -823,6 +909,11 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
|
||||
}
|
||||
defer objectLock.Unlock()
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
return fs.putObject(ctx, bucket, object, r, opts)
|
||||
}
|
||||
|
||||
@@ -981,6 +1072,11 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) er
|
||||
return err
|
||||
}
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
if _, err := fs.statBucketDir(ctx, bucket); err != nil {
|
||||
return toObjectErr(err, bucket)
|
||||
}
|
||||
@@ -1119,6 +1215,12 @@ func (fs *FSObjects) getObjectETag(ctx context.Context, bucket, entry string, lo
|
||||
// ListObjects - list all objects at prefix upto maxKeys., optionally delimited by '/'. Maintains the list pool
|
||||
// state for future re-entrant list requests.
|
||||
func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
|
||||
|
||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
return listObjects(ctx, fs, bucket, prefix, marker, delimiter, maxKeys, fs.listPool,
|
||||
fs.listDirFactory(), fs.getObjectInfo, fs.getObjectInfo)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user