mirror of
https://github.com/minio/minio.git
synced 2025-01-23 20:53:18 -05:00
23a8411732
This generic Walk() is used by likes of Lifecyle, or KMS to rotate keys or any other functionality which relies on this functionality.
248 lines
6.5 KiB
Go
248 lines
6.5 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2016, 2017, 2018 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/bpool"
|
|
"github.com/minio/minio/pkg/dsync"
|
|
"github.com/minio/minio/pkg/madmin"
|
|
"github.com/minio/minio/pkg/sync/errgroup"
|
|
)
|
|
|
|
// XL constants.
|
|
const (
|
|
// XL metadata file carries per object metadata.
|
|
xlMetaJSONFile = "xl.json"
|
|
)
|
|
|
|
// OfflineDisk represents an unavailable disk.
|
|
var OfflineDisk StorageAPI // zero value is nil
|
|
|
|
// partialUpload is a successful upload of an object
|
|
// but not written in all disks (having quorum)
|
|
type partialUpload struct {
|
|
bucket string
|
|
object string
|
|
failedSet int
|
|
}
|
|
|
|
// xlObjects - Implements XL object layer.
|
|
type xlObjects struct {
|
|
// getDisks returns list of storageAPIs.
|
|
getDisks func() []StorageAPI
|
|
|
|
// getLockers returns list of remote and local lockers.
|
|
getLockers func() []dsync.NetLocker
|
|
|
|
// Locker mutex map.
|
|
nsMutex *nsLockMap
|
|
|
|
// Byte pools used for temporary i/o buffers.
|
|
bp *bpool.BytePoolCap
|
|
|
|
// TODO: ListObjects pool management, should be removed in future.
|
|
listPool *TreeWalkPool
|
|
|
|
mrfUploadCh chan partialUpload
|
|
}
|
|
|
|
// NewNSLock - initialize a new namespace RWLocker instance.
|
|
func (xl xlObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
|
return xl.nsMutex.NewNSLock(ctx, xl.getLockers, bucket, objects...)
|
|
}
|
|
|
|
// Shutdown function for object storage interface.
|
|
func (xl xlObjects) Shutdown(ctx context.Context) error {
|
|
// Add any object layer shutdown activities here.
|
|
closeStorageDisks(xl.getDisks())
|
|
return nil
|
|
}
|
|
|
|
// byDiskTotal is a collection satisfying sort.Interface.
|
|
type byDiskTotal []DiskInfo
|
|
|
|
func (d byDiskTotal) Len() int { return len(d) }
|
|
func (d byDiskTotal) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
|
|
func (d byDiskTotal) Less(i, j int) bool {
|
|
return d[i].Total < d[j].Total
|
|
}
|
|
|
|
// getDisksInfo - fetch disks info across all other storage API.
|
|
func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks, offlineDisks madmin.BackendDisks) {
|
|
disksInfo = make([]DiskInfo, len(disks))
|
|
|
|
g := errgroup.WithNErrs(len(disks))
|
|
for index := range disks {
|
|
index := index
|
|
g.Go(func() error {
|
|
if disks[index] == nil {
|
|
// Storage disk is empty, perhaps ignored disk or not available.
|
|
return errDiskNotFound
|
|
}
|
|
info, err := disks[index].DiskInfo()
|
|
if err != nil {
|
|
if IsErr(err, baseErrs...) {
|
|
return err
|
|
}
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("disk", disks[index].String())
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
disksInfo[index] = info
|
|
return nil
|
|
}, index)
|
|
}
|
|
|
|
onlineDisks = make(madmin.BackendDisks)
|
|
offlineDisks = make(madmin.BackendDisks)
|
|
|
|
localNodeAddr := GetLocalPeer(globalEndpoints)
|
|
|
|
// Wait for the routines.
|
|
for i, diskInfoErr := range g.Wait() {
|
|
if disks[i] == nil {
|
|
continue
|
|
}
|
|
peerAddr := disks[i].Hostname()
|
|
if peerAddr == "" {
|
|
peerAddr = localNodeAddr
|
|
}
|
|
if _, ok := offlineDisks[peerAddr]; !ok {
|
|
offlineDisks[peerAddr] = 0
|
|
}
|
|
if _, ok := onlineDisks[peerAddr]; !ok {
|
|
onlineDisks[peerAddr] = 0
|
|
}
|
|
if diskInfoErr != nil {
|
|
offlineDisks[peerAddr]++
|
|
continue
|
|
}
|
|
onlineDisks[peerAddr]++
|
|
}
|
|
|
|
// Success.
|
|
return disksInfo, onlineDisks, offlineDisks
|
|
}
|
|
|
|
// Get an aggregated storage info across all disks.
|
|
func getStorageInfo(disks []StorageAPI) StorageInfo {
|
|
disksInfo, onlineDisks, offlineDisks := getDisksInfo(disks)
|
|
|
|
// Sort so that the first element is the smallest.
|
|
sort.Sort(byDiskTotal(disksInfo))
|
|
|
|
// Combine all disks to get total usage
|
|
usedList := make([]uint64, len(disksInfo))
|
|
totalList := make([]uint64, len(disksInfo))
|
|
availableList := make([]uint64, len(disksInfo))
|
|
mountPaths := make([]string, len(disksInfo))
|
|
|
|
for i, di := range disksInfo {
|
|
usedList[i] = di.Used
|
|
totalList[i] = di.Total
|
|
availableList[i] = di.Free
|
|
mountPaths[i] = di.MountPath
|
|
}
|
|
|
|
storageInfo := StorageInfo{
|
|
Used: usedList,
|
|
Total: totalList,
|
|
Available: availableList,
|
|
MountPaths: mountPaths,
|
|
}
|
|
|
|
storageInfo.Backend.Type = BackendErasure
|
|
storageInfo.Backend.OnlineDisks = onlineDisks
|
|
storageInfo.Backend.OfflineDisks = offlineDisks
|
|
|
|
return storageInfo
|
|
}
|
|
|
|
// StorageInfo - returns underlying storage statistics.
|
|
func (xl xlObjects) StorageInfo(ctx context.Context, local bool) StorageInfo {
|
|
var disks []StorageAPI
|
|
if !local {
|
|
disks = xl.getDisks()
|
|
} else {
|
|
for _, d := range xl.getDisks() {
|
|
if d.Hostname() == "" {
|
|
// Append this local disk since local flag is true
|
|
disks = append(disks, d)
|
|
}
|
|
}
|
|
}
|
|
return getStorageInfo(disks)
|
|
}
|
|
|
|
// GetMetrics - is not implemented and shouldn't be called.
|
|
func (xl xlObjects) GetMetrics(ctx context.Context) (*Metrics, error) {
|
|
logger.LogIf(ctx, NotImplemented{})
|
|
return &Metrics{}, NotImplemented{}
|
|
}
|
|
|
|
// CrawlAndGetDataUsage picks three random disks to crawl and get data usage
|
|
func (xl xlObjects) CrawlAndGetDataUsage(ctx context.Context, endCh <-chan struct{}) DataUsageInfo {
|
|
var randomDisks []StorageAPI
|
|
for _, d := range xl.getLoadBalancedDisks() {
|
|
if d == nil || !d.IsOnline() {
|
|
continue
|
|
}
|
|
randomDisks = append(randomDisks, d)
|
|
if len(randomDisks) >= 3 {
|
|
break
|
|
}
|
|
}
|
|
|
|
var dataUsageResults = make([]DataUsageInfo, len(randomDisks))
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < len(randomDisks); i++ {
|
|
wg.Add(1)
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
var err error
|
|
dataUsageResults[index], err = disk.CrawlAndGetDataUsage(endCh)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}(i, randomDisks[i])
|
|
}
|
|
wg.Wait()
|
|
|
|
var dataUsageInfo = dataUsageResults[0]
|
|
// Pick the crawling result of the disk which has the most
|
|
// number of objects in it.
|
|
for i := 1; i < len(dataUsageResults); i++ {
|
|
if dataUsageResults[i].ObjectsCount > dataUsageInfo.ObjectsCount {
|
|
dataUsageInfo = dataUsageResults[i]
|
|
}
|
|
}
|
|
|
|
return dataUsageInfo
|
|
}
|
|
|
|
// IsReady - No Op.
|
|
func (xl xlObjects) IsReady(ctx context.Context) bool {
|
|
logger.CriticalIf(ctx, NotImplemented{})
|
|
return true
|
|
}
|