mirror of
https://github.com/minio/minio.git
synced 2025-11-09 13:39:46 -05:00
Implement support for calculating disk usage per tenant (#5969)
Fixes #5961
This commit is contained in:
committed by
Nitish Tiwari
parent
483fe4bed5
commit
e6ec645035
@@ -777,7 +777,7 @@ func (c cacheObjects) StorageInfo(ctx context.Context) (storageInfo StorageInfo)
|
||||
if cfs == nil {
|
||||
continue
|
||||
}
|
||||
info, err := getDiskInfo((cfs.fsPath))
|
||||
info, err := getDiskInfo(cfs.fsPath)
|
||||
logger.GetReqInfo(ctx).AppendTags("cachePath", cfs.fsPath)
|
||||
logger.LogIf(ctx, err)
|
||||
total += info.Total
|
||||
|
||||
59
cmd/disk-usage.go
Normal file
59
cmd/disk-usage.go
Normal file
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2018 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
usageCheckInterval = 12 * time.Hour // 12 hours
|
||||
)
|
||||
|
||||
// getDiskUsage walks the file tree rooted at root, calling usageFn
|
||||
// for each file or directory in the tree, including root.
|
||||
func getDiskUsage(ctx context.Context, root string, usageFn usageFunc) error {
|
||||
return walk(ctx, root+slashSeparator, usageFn)
|
||||
}
|
||||
|
||||
type usageFunc func(ctx context.Context, entry string) error
|
||||
|
||||
// walk recursively descends path, calling walkFn.
|
||||
func walk(ctx context.Context, path string, usageFn usageFunc) error {
|
||||
if err := usageFn(ctx, path); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !hasSuffix(path, slashSeparator) {
|
||||
return nil
|
||||
}
|
||||
|
||||
entries, err := readDir(path)
|
||||
if err != nil {
|
||||
return usageFn(ctx, path)
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
fname := pathJoin(path, entry)
|
||||
if err = walk(ctx, fname, usageFn); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -80,7 +80,7 @@ func TestNewMultipartUploadFaultyDisk(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test with disk removed.
|
||||
fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
|
||||
os.RemoveAll(disk)
|
||||
if _, err := fs.NewMultipartUpload(context.Background(), bucketName, objectName, map[string]string{"X-Amz-Meta-xid": "3f"}); err != nil {
|
||||
if !isSameType(err, BucketNotFound{}) {
|
||||
t.Fatal("Unexpected error ", err)
|
||||
|
||||
65
cmd/fs-v1.go
65
cmd/fs-v1.go
@@ -26,6 +26,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
@@ -63,6 +64,10 @@ type FSObjects struct {
|
||||
|
||||
// To manage the appendRoutine go-routines
|
||||
nsMutex *nsLockMap
|
||||
|
||||
// Disk usage metrics
|
||||
totalUsed uint64
|
||||
usageCheckInterval time.Duration
|
||||
}
|
||||
|
||||
// Represents the background append file.
|
||||
@@ -129,9 +134,10 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
|
||||
rwPool: &fsIOPool{
|
||||
readersMap: make(map[string]*lock.RLockedFile),
|
||||
},
|
||||
nsMutex: newNSLock(false),
|
||||
listPool: newTreeWalkPool(globalLookupTimeout),
|
||||
appendFileMap: make(map[string]*fsAppendFile),
|
||||
nsMutex: newNSLock(false),
|
||||
listPool: newTreeWalkPool(globalLookupTimeout),
|
||||
appendFileMap: make(map[string]*fsAppendFile),
|
||||
usageCheckInterval: usageCheckInterval,
|
||||
}
|
||||
|
||||
// Once the filesystem has initialized hold the read lock for
|
||||
@@ -150,6 +156,7 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
|
||||
return nil, uiErrUnableToReadFromBackend(err).Msg("Unable to initialize policy system")
|
||||
}
|
||||
|
||||
go fs.diskUsage(globalServiceDoneCh)
|
||||
go fs.cleanupStaleMultipartUploads(ctx, globalMultipartCleanupInterval, globalMultipartExpiry, globalServiceDoneCh)
|
||||
|
||||
// Return successfully initialized object layer.
|
||||
@@ -164,14 +171,64 @@ func (fs *FSObjects) Shutdown(ctx context.Context) error {
|
||||
return fsRemoveAll(ctx, pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID))
|
||||
}
|
||||
|
||||
// diskUsage returns du information for the posix path, in a continuous routine.
|
||||
func (fs *FSObjects) diskUsage(doneCh chan struct{}) {
|
||||
ticker := time.NewTicker(fs.usageCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
var usage uint64
|
||||
usageFn := func(ctx context.Context, entry string) error {
|
||||
if hasSuffix(entry, slashSeparator) {
|
||||
return nil
|
||||
}
|
||||
fi, err := fsStatFile(ctx, entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
usage = usage + uint64(fi.Size())
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil {
|
||||
return
|
||||
}
|
||||
atomic.StoreUint64(&fs.totalUsed, usage)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
usage = 0
|
||||
usageFn = func(ctx context.Context, entry string) error {
|
||||
if hasSuffix(entry, slashSeparator) {
|
||||
return nil
|
||||
}
|
||||
fi, err := fsStatFile(ctx, entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
usage = usage + uint64(fi.Size())
|
||||
return nil
|
||||
}
|
||||
if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil {
|
||||
continue
|
||||
}
|
||||
atomic.StoreUint64(&fs.totalUsed, usage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StorageInfo - returns underlying storage statistics.
|
||||
func (fs *FSObjects) StorageInfo(ctx context.Context) StorageInfo {
|
||||
info, err := getDiskInfo((fs.fsPath))
|
||||
info, err := getDiskInfo(fs.fsPath)
|
||||
logger.GetReqInfo(ctx).AppendTags("path", fs.fsPath)
|
||||
logger.LogIf(ctx, err)
|
||||
|
||||
storageInfo := StorageInfo{
|
||||
Total: info.Total,
|
||||
Free: info.Free,
|
||||
Used: atomic.LoadUint64(&fs.totalUsed),
|
||||
}
|
||||
storageInfo.Backend.Type = FS
|
||||
return storageInfo
|
||||
|
||||
@@ -18,8 +18,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
)
|
||||
|
||||
// naughtyDisk wraps a POSIX disk and returns programmed errors
|
||||
@@ -74,7 +72,7 @@ func (d *naughtyDisk) calcError() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) DiskInfo() (info disk.Info, err error) {
|
||||
func (d *naughtyDisk) DiskInfo() (info DiskInfo, err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
||||
@@ -39,10 +39,10 @@ const (
|
||||
|
||||
// StorageInfo - represents total capacity of underlying storage.
|
||||
type StorageInfo struct {
|
||||
// Total disk space.
|
||||
Total uint64
|
||||
// Free available disk space.
|
||||
Free uint64
|
||||
Total uint64 // Total disk space.
|
||||
Free uint64 // Free available space.
|
||||
Used uint64 // Used total used per tenant.
|
||||
|
||||
// Backend type.
|
||||
Backend struct {
|
||||
// Represents various backend types, currently on FS and Erasure.
|
||||
|
||||
93
cmd/posix.go
93
cmd/posix.go
@@ -29,6 +29,7 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
@@ -47,6 +48,11 @@ type posix struct {
|
||||
diskPath string
|
||||
pool sync.Pool
|
||||
connected bool
|
||||
|
||||
// Disk usage metrics
|
||||
stopUsageCh chan struct{}
|
||||
totalUsage uint64
|
||||
usageCheckInterval time.Duration
|
||||
}
|
||||
|
||||
// checkPathLength - returns error if given path name length more than 255
|
||||
@@ -128,6 +134,7 @@ func isDirEmpty(dirname string) bool {
|
||||
return false
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// List one entry.
|
||||
_, err = f.Readdirnames(1)
|
||||
if err != io.EOF {
|
||||
@@ -157,9 +164,14 @@ func newPosix(path string) (StorageAPI, error) {
|
||||
return &b
|
||||
},
|
||||
},
|
||||
stopUsageCh: make(chan struct{}),
|
||||
usageCheckInterval: usageCheckInterval,
|
||||
}
|
||||
|
||||
st.connected = true
|
||||
|
||||
go st.diskUsage()
|
||||
|
||||
// Success.
|
||||
return st, nil
|
||||
}
|
||||
@@ -242,6 +254,7 @@ func (s *posix) String() string {
|
||||
}
|
||||
|
||||
func (s *posix) Close() error {
|
||||
close(s.stopUsageCh)
|
||||
s.connected = false
|
||||
return nil
|
||||
}
|
||||
@@ -250,10 +263,26 @@ func (s *posix) IsOnline() bool {
|
||||
return s.connected
|
||||
}
|
||||
|
||||
// DiskInfo is an extended type which returns current
|
||||
// disk usage per path.
|
||||
type DiskInfo struct {
|
||||
Total uint64
|
||||
Free uint64
|
||||
Used uint64
|
||||
}
|
||||
|
||||
// DiskInfo provides current information about disk space usage,
|
||||
// total free inodes and underlying filesystem.
|
||||
func (s *posix) DiskInfo() (info disk.Info, err error) {
|
||||
return getDiskInfo((s.diskPath))
|
||||
func (s *posix) DiskInfo() (info DiskInfo, err error) {
|
||||
di, err := getDiskInfo(s.diskPath)
|
||||
if err != nil {
|
||||
return info, err
|
||||
}
|
||||
return DiskInfo{
|
||||
Total: di.Total,
|
||||
Free: di.Free,
|
||||
Used: atomic.LoadUint64(&s.totalUsage),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// getVolDir - will convert incoming volume names to
|
||||
@@ -285,6 +314,66 @@ func (s *posix) checkDiskFound() (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// diskUsage returns du information for the posix path, in a continuous routine.
|
||||
func (s *posix) diskUsage() {
|
||||
ticker := time.NewTicker(s.usageCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
var usage uint64
|
||||
usageFn := func(ctx context.Context, entry string) error {
|
||||
select {
|
||||
case <-s.stopUsageCh:
|
||||
return errWalkAbort
|
||||
default:
|
||||
if hasSuffix(entry, slashSeparator) {
|
||||
return nil
|
||||
}
|
||||
fi, err := os.Stat(entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
usage = usage + uint64(fi.Size())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil {
|
||||
return
|
||||
}
|
||||
atomic.StoreUint64(&s.totalUsage, usage)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.stopUsageCh:
|
||||
return
|
||||
case <-globalServiceDoneCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
usage = 0
|
||||
usageFn = func(ctx context.Context, entry string) error {
|
||||
select {
|
||||
case <-s.stopUsageCh:
|
||||
return errWalkAbort
|
||||
default:
|
||||
if hasSuffix(entry, slashSeparator) {
|
||||
return nil
|
||||
}
|
||||
fi, err := os.Stat(entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
usage = usage + uint64(fi.Size())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil {
|
||||
continue
|
||||
}
|
||||
atomic.StoreUint64(&s.totalUsage, usage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make a volume entry.
|
||||
func (s *posix) MakeVol(volume string) (err error) {
|
||||
defer func() {
|
||||
|
||||
@@ -18,8 +18,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
)
|
||||
|
||||
// StorageAPI interface.
|
||||
@@ -30,7 +28,7 @@ type StorageAPI interface {
|
||||
// Storage operations.
|
||||
IsOnline() bool // Returns true if disk is online.
|
||||
Close() error
|
||||
DiskInfo() (info disk.Info, err error)
|
||||
DiskInfo() (info DiskInfo, err error)
|
||||
|
||||
// Volume operations.
|
||||
MakeVol(volume string) (err error)
|
||||
|
||||
@@ -23,8 +23,6 @@ import (
|
||||
"net/rpc"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
)
|
||||
|
||||
type networkStorage struct {
|
||||
@@ -164,10 +162,10 @@ func (n *networkStorage) call(handler string, args interface {
|
||||
}
|
||||
|
||||
// DiskInfo - fetch disk information for a remote disk.
|
||||
func (n *networkStorage) DiskInfo() (info disk.Info, err error) {
|
||||
func (n *networkStorage) DiskInfo() (info DiskInfo, err error) {
|
||||
args := AuthRPCArgs{}
|
||||
if err = n.call("Storage.DiskInfoHandler", &args, &info); err != nil {
|
||||
return disk.Info{}, err
|
||||
return DiskInfo{}, err
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
)
|
||||
|
||||
// Storage server implements rpc primitives to facilitate exporting a
|
||||
@@ -39,7 +38,7 @@ type storageServer struct {
|
||||
/// Storage operations handlers.
|
||||
|
||||
// DiskInfoHandler - disk info handler is rpc wrapper for DiskInfo operation.
|
||||
func (s *storageServer) DiskInfoHandler(args *AuthRPCArgs, reply *disk.Info) error {
|
||||
func (s *storageServer) DiskInfoHandler(args *AuthRPCArgs, reply *DiskInfo) error {
|
||||
if err := args.IsAuthenticated(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -19,8 +19,6 @@ package cmd
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
)
|
||||
|
||||
type testStorageRPCServer struct {
|
||||
@@ -91,7 +89,7 @@ func TestStorageRPCInvalidToken(t *testing.T) {
|
||||
Vol: "myvol",
|
||||
}
|
||||
// 1. DiskInfoHandler
|
||||
diskInfoReply := &disk.Info{}
|
||||
diskInfoReply := &DiskInfo{}
|
||||
err = storageRPC.DiskInfoHandler(&badAuthRPCArgs, diskInfoReply)
|
||||
errorIfInvalidToken(t, err)
|
||||
|
||||
|
||||
@@ -282,6 +282,7 @@ func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo {
|
||||
lstorageInfo := set.StorageInfo(ctx)
|
||||
storageInfo.Total = storageInfo.Total + lstorageInfo.Total
|
||||
storageInfo.Free = storageInfo.Free + lstorageInfo.Free
|
||||
storageInfo.Used = storageInfo.Used + lstorageInfo.Used
|
||||
storageInfo.Backend.OnlineDisks = storageInfo.Backend.OnlineDisks + lstorageInfo.Backend.OnlineDisks
|
||||
storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks + lstorageInfo.Backend.OfflineDisks
|
||||
}
|
||||
|
||||
18
cmd/xl-v1.go
18
cmd/xl-v1.go
@@ -23,7 +23,6 @@ import (
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
)
|
||||
|
||||
// XL constants.
|
||||
@@ -118,7 +117,7 @@ func (xl xlObjects) ClearLocks(ctx context.Context, volLocks []VolumeLockInfo) e
|
||||
}
|
||||
|
||||
// byDiskTotal is a collection satisfying sort.Interface.
|
||||
type byDiskTotal []disk.Info
|
||||
type byDiskTotal []DiskInfo
|
||||
|
||||
func (d byDiskTotal) Len() int { return len(d) }
|
||||
func (d byDiskTotal) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
|
||||
@@ -127,8 +126,8 @@ func (d byDiskTotal) Less(i, j int) bool {
|
||||
}
|
||||
|
||||
// getDisksInfo - fetch disks info across all other storage API.
|
||||
func getDisksInfo(disks []StorageAPI) (disksInfo []disk.Info, onlineDisks int, offlineDisks int) {
|
||||
disksInfo = make([]disk.Info, len(disks))
|
||||
func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks int, offlineDisks int) {
|
||||
disksInfo = make([]DiskInfo, len(disks))
|
||||
for i, storageDisk := range disks {
|
||||
if storageDisk == nil {
|
||||
// Storage disk is empty, perhaps ignored disk or not available.
|
||||
@@ -154,8 +153,8 @@ func getDisksInfo(disks []StorageAPI) (disksInfo []disk.Info, onlineDisks int, o
|
||||
// returns sorted disksInfo slice which has only valid entries.
|
||||
// i.e the entries where the total size of the disk is not stated
|
||||
// as 0Bytes, this means that the disk is not online or ignored.
|
||||
func sortValidDisksInfo(disksInfo []disk.Info) []disk.Info {
|
||||
var validDisksInfo []disk.Info
|
||||
func sortValidDisksInfo(disksInfo []DiskInfo) []DiskInfo {
|
||||
var validDisksInfo []DiskInfo
|
||||
for _, diskInfo := range disksInfo {
|
||||
if diskInfo.Total == 0 {
|
||||
continue
|
||||
@@ -201,6 +200,13 @@ func getStorageInfo(disks []StorageAPI) StorageInfo {
|
||||
Free: validDisksInfo[0].Free * availableDataDisks,
|
||||
}
|
||||
|
||||
// Combine all disks to get total usage.
|
||||
var used uint64
|
||||
for _, di := range validDisksInfo {
|
||||
used = used + di.Used
|
||||
}
|
||||
storageInfo.Used = used
|
||||
|
||||
storageInfo.Backend.Type = Erasure
|
||||
storageInfo.Backend.OnlineDisks = onlineDisks
|
||||
storageInfo.Backend.OfflineDisks = offlineDisks
|
||||
|
||||
@@ -21,8 +21,6 @@ import (
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
)
|
||||
|
||||
// TestStorageInfo - tests storage info.
|
||||
@@ -55,18 +53,18 @@ func TestStorageInfo(t *testing.T) {
|
||||
// Sort valid disks info.
|
||||
func TestSortingValidDisks(t *testing.T) {
|
||||
testCases := []struct {
|
||||
disksInfo []disk.Info
|
||||
validDisksInfo []disk.Info
|
||||
disksInfo []DiskInfo
|
||||
validDisksInfo []DiskInfo
|
||||
}{
|
||||
// One of the disks is offline.
|
||||
{
|
||||
disksInfo: []disk.Info{
|
||||
disksInfo: []DiskInfo{
|
||||
{Total: 150, Free: 10},
|
||||
{Total: 0, Free: 0},
|
||||
{Total: 200, Free: 10},
|
||||
{Total: 100, Free: 10},
|
||||
},
|
||||
validDisksInfo: []disk.Info{
|
||||
validDisksInfo: []DiskInfo{
|
||||
{Total: 100, Free: 10},
|
||||
{Total: 150, Free: 10},
|
||||
{Total: 200, Free: 10},
|
||||
@@ -74,13 +72,13 @@ func TestSortingValidDisks(t *testing.T) {
|
||||
},
|
||||
// All disks are online.
|
||||
{
|
||||
disksInfo: []disk.Info{
|
||||
disksInfo: []DiskInfo{
|
||||
{Total: 150, Free: 10},
|
||||
{Total: 200, Free: 10},
|
||||
{Total: 100, Free: 10},
|
||||
{Total: 115, Free: 10},
|
||||
},
|
||||
validDisksInfo: []disk.Info{
|
||||
validDisksInfo: []DiskInfo{
|
||||
{Total: 100, Free: 10},
|
||||
{Total: 115, Free: 10},
|
||||
{Total: 150, Free: 10},
|
||||
|
||||
Reference in New Issue
Block a user