2022-10-25 15:36:57 -04:00
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"math/rand"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/lithammer/shortuuid/v4"
2022-12-06 16:46:50 -05:00
"github.com/minio/madmin-go/v2"
2022-10-25 15:36:57 -04:00
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/env"
)
//go:generate msgp -file $GOFILE -unexported
// rebalanceStats contains per-pool rebalance statistics like number of objects,
// versions and bytes rebalanced out of a pool
type rebalanceStats struct {
InitFreeSpace uint64 ` json:"initFreeSpace" msg:"ifs" ` // Pool free space at the start of rebalance
InitCapacity uint64 ` json:"initCapacity" msg:"ic" ` // Pool capacity at the start of rebalance
Buckets [ ] string ` json:"buckets" msg:"bus" ` // buckets being rebalanced or to be rebalanced
RebalancedBuckets [ ] string ` json:"rebalancedBuckets" msg:"rbs" ` // buckets rebalanced
Bucket string ` json:"bucket" msg:"bu" ` // Last rebalanced bucket
Object string ` json:"object" msg:"ob" ` // Last rebalanced object
NumObjects uint64 ` json:"numObjects" msg:"no" ` // Number of objects rebalanced
NumVersions uint64 ` json:"numVersions" msg:"nv" ` // Number of versions rebalanced
Bytes uint64 ` json:"bytes" msg:"bs" ` // Number of bytes rebalanced
Participating bool ` json:"participating" msg:"par" `
Info rebalanceInfo ` json:"info" msg:"inf" `
}
2022-12-14 03:15:14 -05:00
func ( rs * rebalanceStats ) update ( bucket string , fi FileInfo ) {
if fi . IsLatest {
2022-10-25 15:36:57 -04:00
rs . NumObjects ++
}
rs . NumVersions ++
2022-12-14 03:15:14 -05:00
onDiskSz := fi . Size * int64 ( fi . Erasure . DataBlocks + fi . Erasure . ParityBlocks ) / int64 ( fi . Erasure . DataBlocks )
rs . Bytes += uint64 ( onDiskSz )
2022-10-25 15:36:57 -04:00
rs . Bucket = bucket
2022-12-14 03:15:14 -05:00
rs . Object = fi . Name
2022-10-25 15:36:57 -04:00
}
type rstats [ ] * rebalanceStats
//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
type rebalStatus uint8
const (
rebalNone rebalStatus = iota
rebalStarted
rebalCompleted
rebalStopped
rebalFailed
)
type rebalanceInfo struct {
StartTime time . Time ` msg:"startTs" ` // Time at which rebalance-start was issued
EndTime time . Time ` msg:"stopTs" ` // Time at which rebalance operation completed or rebalance-stop was called
Status rebalStatus ` msg:"status" ` // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
}
// rebalanceMeta contains information pertaining to an ongoing rebalance operation.
type rebalanceMeta struct {
cancel context . CancelFunc ` msg:"-" ` // to be invoked on rebalance-stop
lastRefreshedAt time . Time ` msg:"-" `
StoppedAt time . Time ` msg:"stopTs" ` // Time when rebalance-stop was issued.
ID string ` msg:"id" ` // ID of the ongoing rebalance operation
PercentFreeGoal float64 ` msg:"pf" ` // Computed from total free space and capacity at the start of rebalance
PoolStats [ ] * rebalanceStats ` msg:"rss" ` // Per-pool rebalance stats keyed by pool index
}
var errRebalanceNotStarted = errors . New ( "rebalance not started" )
func ( z * erasureServerPools ) loadRebalanceMeta ( ctx context . Context ) error {
r := & rebalanceMeta { }
err := r . load ( ctx , z . serverPools [ 0 ] )
if err != nil {
if errors . Is ( err , errConfigNotFound ) {
return nil
}
return err
}
z . rebalMu . Lock ( )
z . rebalMeta = r
z . rebalMu . Unlock ( )
return nil
}
// initRebalanceMeta initializes rebalance metadata for a new rebalance
// operation and saves it in the object store.
func ( z * erasureServerPools ) initRebalanceMeta ( ctx context . Context , buckets [ ] string ) ( arn string , err error ) {
r := & rebalanceMeta {
ID : shortuuid . New ( ) ,
PoolStats : make ( [ ] * rebalanceStats , len ( z . serverPools ) ) ,
}
// Fetch disk capacity and available space.
2022-12-01 17:31:35 -05:00
si := z . StorageInfo ( ctx )
2022-10-25 15:36:57 -04:00
diskStats := make ( [ ] struct {
AvailableSpace uint64
TotalSpace uint64
} , len ( z . serverPools ) )
var totalCap , totalFree uint64
for _ , disk := range si . Disks {
2023-01-30 08:03:07 -05:00
// Ignore invalid.
if disk . PoolIndex < 0 || len ( diskStats ) <= disk . PoolIndex {
// https://github.com/minio/minio/issues/16500
continue
}
2022-10-25 15:36:57 -04:00
totalCap += disk . TotalSpace
totalFree += disk . AvailableSpace
diskStats [ disk . PoolIndex ] . AvailableSpace += disk . AvailableSpace
diskStats [ disk . PoolIndex ] . TotalSpace += disk . TotalSpace
}
r . PercentFreeGoal = float64 ( totalFree ) / float64 ( totalCap )
now := time . Now ( )
for idx := range z . serverPools {
r . PoolStats [ idx ] = & rebalanceStats {
Buckets : make ( [ ] string , len ( buckets ) ) ,
RebalancedBuckets : make ( [ ] string , 0 , len ( buckets ) ) ,
InitFreeSpace : diskStats [ idx ] . AvailableSpace ,
InitCapacity : diskStats [ idx ] . TotalSpace ,
}
copy ( r . PoolStats [ idx ] . Buckets , buckets )
if pfi := float64 ( diskStats [ idx ] . AvailableSpace ) / float64 ( diskStats [ idx ] . TotalSpace ) ; pfi < r . PercentFreeGoal {
r . PoolStats [ idx ] . Participating = true
r . PoolStats [ idx ] . Info = rebalanceInfo {
StartTime : now ,
Status : rebalStarted ,
}
}
}
err = r . save ( ctx , z . serverPools [ 0 ] )
if err != nil {
return arn , err
}
z . rebalMeta = r
return r . ID , nil
}
2022-12-14 03:15:14 -05:00
func ( z * erasureServerPools ) updatePoolStats ( poolIdx int , bucket string , fi FileInfo ) {
2022-10-25 15:36:57 -04:00
z . rebalMu . Lock ( )
defer z . rebalMu . Unlock ( )
r := z . rebalMeta
if r == nil {
return
}
2022-12-14 03:15:14 -05:00
r . PoolStats [ poolIdx ] . update ( bucket , fi )
2022-10-25 15:36:57 -04:00
}
const (
rebalMetaName = "rebalance.bin"
rebalMetaFmt = 1
rebalMetaVer = 1
)
func ( z * erasureServerPools ) nextRebalBucket ( poolIdx int ) ( string , bool ) {
z . rebalMu . RLock ( )
defer z . rebalMu . RUnlock ( )
r := z . rebalMeta
if r == nil {
return "" , false
}
ps := r . PoolStats [ poolIdx ]
if ps == nil {
return "" , false
}
if ps . Info . Status == rebalCompleted || ! ps . Participating {
return "" , false
}
if len ( ps . Buckets ) == 0 {
return "" , false
}
return ps . Buckets [ 0 ] , true
}
func ( z * erasureServerPools ) bucketRebalanceDone ( bucket string , poolIdx int ) {
z . rebalMu . Lock ( )
defer z . rebalMu . Unlock ( )
ps := z . rebalMeta . PoolStats [ poolIdx ]
if ps == nil {
return
}
for i , b := range ps . Buckets {
if b == bucket {
ps . Buckets = append ( ps . Buckets [ : i ] , ps . Buckets [ i + 1 : ] ... )
ps . RebalancedBuckets = append ( ps . RebalancedBuckets , bucket )
break
}
}
}
func ( r * rebalanceMeta ) load ( ctx context . Context , store objectIO ) error {
return r . loadWithOpts ( ctx , store , ObjectOptions { } )
}
func ( r * rebalanceMeta ) loadWithOpts ( ctx context . Context , store objectIO , opts ObjectOptions ) error {
data , _ , err := readConfigWithMetadata ( ctx , store , rebalMetaName , opts )
if err != nil {
return err
}
if len ( data ) == 0 {
return nil
}
if len ( data ) <= 4 {
return fmt . Errorf ( "rebalanceMeta: no data" )
}
// Read header
switch binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) {
case rebalMetaFmt :
default :
return fmt . Errorf ( "rebalanceMeta: unknown format: %d" , binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) )
}
switch binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) {
case rebalMetaVer :
default :
return fmt . Errorf ( "rebalanceMeta: unknown version: %d" , binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) )
}
// OK, parse data.
if _ , err = r . UnmarshalMsg ( data [ 4 : ] ) ; err != nil {
return err
}
r . lastRefreshedAt = time . Now ( )
return nil
}
func ( r * rebalanceMeta ) saveWithOpts ( ctx context . Context , store objectIO , opts ObjectOptions ) error {
data := make ( [ ] byte , 4 , r . Msgsize ( ) + 4 )
// Initialize the header.
binary . LittleEndian . PutUint16 ( data [ 0 : 2 ] , rebalMetaFmt )
binary . LittleEndian . PutUint16 ( data [ 2 : 4 ] , rebalMetaVer )
buf , err := r . MarshalMsg ( data )
if err != nil {
return err
}
return saveConfigWithOpts ( ctx , store , rebalMetaName , buf , opts )
}
func ( r * rebalanceMeta ) save ( ctx context . Context , store objectIO ) error {
return r . saveWithOpts ( ctx , store , ObjectOptions { } )
}
func ( z * erasureServerPools ) IsRebalanceStarted ( ) bool {
z . rebalMu . RLock ( )
defer z . rebalMu . RUnlock ( )
if r := z . rebalMeta ; r != nil {
if r . StoppedAt . IsZero ( ) {
return true
}
}
return false
}
func ( z * erasureServerPools ) IsPoolRebalancing ( poolIndex int ) bool {
z . rebalMu . RLock ( )
defer z . rebalMu . RUnlock ( )
if r := z . rebalMeta ; r != nil {
if ! r . StoppedAt . IsZero ( ) {
return false
}
ps := z . rebalMeta . PoolStats [ poolIndex ]
return ps . Participating && ps . Info . Status == rebalStarted
}
return false
}
func ( z * erasureServerPools ) rebalanceBuckets ( ctx context . Context , poolIdx int ) ( err error ) {
doneCh := make ( chan struct { } )
defer close ( doneCh )
// Save rebalance.bin periodically.
go func ( ) {
// Update rebalance.bin periodically once every 5-10s, chosen randomly
// to avoid multiple pool leaders herding to update around the same
// time.
r := rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) )
randSleepFor := func ( ) time . Duration {
return 5 * time . Second + time . Duration ( float64 ( 5 * time . Second ) * r . Float64 ( ) )
}
timer := time . NewTimer ( randSleepFor ( ) )
defer timer . Stop ( )
var rebalDone bool
var traceMsg string
for {
select {
case <- doneCh :
// rebalance completed for poolIdx
now := time . Now ( )
z . rebalMu . Lock ( )
z . rebalMeta . PoolStats [ poolIdx ] . Info . Status = rebalCompleted
z . rebalMeta . PoolStats [ poolIdx ] . Info . EndTime = now
z . rebalMu . Unlock ( )
rebalDone = true
traceMsg = fmt . Sprintf ( "completed at %s" , now )
case <- ctx . Done ( ) :
// rebalance stopped for poolIdx
now := time . Now ( )
z . rebalMu . Lock ( )
z . rebalMeta . PoolStats [ poolIdx ] . Info . Status = rebalStopped
z . rebalMeta . PoolStats [ poolIdx ] . Info . EndTime = now
2023-01-18 20:24:23 -05:00
z . rebalMeta . cancel = nil // remove the already used context.CancelFunc
2022-10-25 15:36:57 -04:00
z . rebalMu . Unlock ( )
rebalDone = true
traceMsg = fmt . Sprintf ( "stopped at %s" , now )
case <- timer . C :
traceMsg = fmt . Sprintf ( "saved at %s" , time . Now ( ) )
}
stopFn := globalRebalanceMetrics . log ( rebalanceMetricSaveMetadata , poolIdx , traceMsg )
err := z . saveRebalanceStats ( ctx , poolIdx , rebalSaveStats )
stopFn ( err )
logger . LogIf ( ctx , err )
timer . Reset ( randSleepFor ( ) )
if rebalDone {
return
}
}
} ( )
for {
select {
case <- ctx . Done ( ) :
return
default :
}
bucket , ok := z . nextRebalBucket ( poolIdx )
if ! ok {
// no more buckets to rebalance or target free_space/capacity reached
break
}
stopFn := globalRebalanceMetrics . log ( rebalanceMetricRebalanceBucket , poolIdx , bucket )
err = z . rebalanceBucket ( ctx , bucket , poolIdx )
if err != nil {
stopFn ( err )
logger . LogIf ( ctx , err )
return
}
stopFn ( nil )
z . bucketRebalanceDone ( bucket , poolIdx )
}
return err
}
func ( z * erasureServerPools ) checkIfRebalanceDone ( poolIdx int ) bool {
z . rebalMu . Lock ( )
defer z . rebalMu . Unlock ( )
// check if enough objects have been rebalanced
r := z . rebalMeta
poolStats := r . PoolStats [ poolIdx ]
if poolStats . Info . Status == rebalCompleted {
return true
}
pfi := float64 ( poolStats . InitFreeSpace + poolStats . Bytes ) / float64 ( poolStats . InitCapacity )
// Mark pool rebalance as done if within 5% from PercentFreeGoal.
if diff := math . Abs ( pfi - r . PercentFreeGoal ) ; diff <= 0.05 {
r . PoolStats [ poolIdx ] . Info . Status = rebalCompleted
r . PoolStats [ poolIdx ] . Info . EndTime = time . Now ( )
return true
}
return false
}
// rebalanceBucket rebalances objects under bucket in poolIdx pool
func ( z * erasureServerPools ) rebalanceBucket ( ctx context . Context , bucket string , poolIdx int ) error {
ctx = logger . SetReqInfo ( ctx , & logger . ReqInfo { } )
vc , _ := globalBucketVersioningSys . Get ( bucket )
// Check if the current bucket has a configured lifecycle policy
lc , _ := globalLifecycleSys . Get ( bucket )
// Check if bucket is object locked.
lr , _ := globalBucketObjectLockSys . Get ( bucket )
pool := z . serverPools [ poolIdx ]
const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
wStr := env . Get ( envRebalanceWorkers , strconv . Itoa ( len ( pool . sets ) ) )
workerSize , err := strconv . Atoi ( wStr )
if err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "invalid %s value: %s err: %v, defaulting to %d" , envRebalanceWorkers , wStr , err , len ( pool . sets ) ) )
workerSize = len ( pool . sets )
}
workers := make ( chan struct { } , workerSize )
var wg sync . WaitGroup
for _ , set := range pool . sets {
set := set
disks := set . getOnlineDisks ( )
if len ( disks ) == 0 {
logger . LogIf ( ctx , fmt . Errorf ( "no online disks found for set with endpoints %s" ,
set . getEndpoints ( ) ) )
continue
}
filterLifecycle := func ( bucket , object string , fi FileInfo ) bool {
if lc == nil {
return false
}
versioned := vc != nil && vc . Versioned ( object )
objInfo := fi . ToObjectInfo ( bucket , object , versioned )
event := evalActionFromLifecycle ( ctx , * lc , lr , objInfo )
switch action := event . Action ; action {
case lifecycle . DeleteVersionAction , lifecycle . DeleteAction :
globalExpiryState . enqueueByDays ( objInfo , false , action == lifecycle . DeleteVersionAction )
// Skip this entry.
return true
case lifecycle . DeleteRestoredAction , lifecycle . DeleteRestoredVersionAction :
globalExpiryState . enqueueByDays ( objInfo , true , action == lifecycle . DeleteRestoredVersionAction )
// Skip this entry.
return true
}
return false
}
rebalanceEntry := func ( entry metaCacheEntry ) {
defer func ( ) {
<- workers
wg . Done ( )
} ( )
if entry . isDir ( ) {
return
}
// rebalance on poolIdx has reached its goal
if z . checkIfRebalanceDone ( poolIdx ) {
return
}
fivs , err := entry . fileInfoVersions ( bucket )
if err != nil {
return
}
// We need a reversed order for rebalance,
// to create the appropriate stack.
versionsSorter ( fivs . Versions ) . reverse ( )
var rebalanced int
for _ , version := range fivs . Versions {
// Skip transitioned objects for now. TBD
if version . IsRemote ( ) {
continue
}
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle ( bucket , version . Name , version ) {
logger . LogIf ( ctx , fmt . Errorf ( "found %s/%s (%s) expired object based on ILM rules, skipping and scheduled for deletion" , bucket , version . Name , version . VersionID ) )
continue
}
// We will skip rebalancing delete markers
// with single version, its as good as there
// is no data associated with the object.
if version . Deleted && len ( fivs . Versions ) == 1 {
logger . LogIf ( ctx , fmt . Errorf ( "found %s/%s delete marked object with no other versions, skipping since there is no data to be rebalanced" , bucket , version . Name ) )
continue
}
if version . Deleted {
_ , err := z . DeleteObject ( ctx ,
bucket ,
version . Name ,
ObjectOptions {
Versioned : vc . PrefixEnabled ( version . Name ) ,
VersionID : version . VersionID ,
MTime : version . ModTime ,
DeleteReplication : version . ReplicationState ,
DeleteMarker : true , // make sure we create a delete marker
SkipRebalancing : true , // make sure we skip the decommissioned pool
} )
var failure bool
if err != nil && ! isErrObjectNotFound ( err ) && ! isErrVersionNotFound ( err ) {
logger . LogIf ( ctx , err )
failure = true
}
if ! failure {
2022-12-14 03:15:14 -05:00
z . updatePoolStats ( poolIdx , bucket , version )
2022-10-25 15:36:57 -04:00
rebalanced ++
}
continue
}
var failure bool
for try := 0 ; try < 3 ; try ++ {
// GetObjectReader.Close is called by rebalanceObject
gr , err := set . GetObjectNInfo ( ctx ,
bucket ,
encodeDirObject ( version . Name ) ,
nil ,
http . Header { } ,
noLock , // all mutations are blocked reads are safe without locks.
ObjectOptions {
VersionID : version . VersionID ,
NoDecryption : true ,
} )
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
// object deleted by the application, nothing to do here we move on.
return
}
if err != nil {
failure = true
logger . LogIf ( ctx , err )
continue
}
stopFn := globalRebalanceMetrics . log ( rebalanceMetricRebalanceObject , poolIdx , bucket , version . Name )
if err = z . rebalanceObject ( ctx , bucket , gr ) ; err != nil {
stopFn ( err )
failure = true
logger . LogIf ( ctx , err )
continue
}
stopFn ( nil )
failure = false
break
}
if failure {
break // break out on first error
}
2022-12-14 03:15:14 -05:00
z . updatePoolStats ( poolIdx , bucket , version )
2022-10-25 15:36:57 -04:00
rebalanced ++
}
// if all versions were rebalanced, we can delete the object versions.
if rebalanced == len ( fivs . Versions ) {
stopFn := globalRebalanceMetrics . log ( rebalanceMetricRebalanceRemoveObject , poolIdx , bucket , entry . name )
_ , err := set . DeleteObject ( ctx ,
bucket ,
encodeDirObject ( entry . name ) ,
ObjectOptions {
DeletePrefix : true , // use prefix delete to delete all versions at once.
} ,
)
stopFn ( err )
auditLogRebalance ( ctx , "Rebalance:DeleteObject" , bucket , entry . name , "" , err )
if err != nil {
logger . LogIf ( ctx , err )
}
}
}
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
// How to resolve partial results.
resolver := metadataResolutionParams {
dirQuorum : len ( disks ) / 2 , // make sure to capture all quorum ratios
objQuorum : len ( disks ) / 2 , // make sure to capture all quorum ratios
bucket : bucket ,
}
err := listPathRaw ( ctx , listPathRawOptions {
disks : disks ,
bucket : bucket ,
recursive : true ,
forwardTo : "" ,
minDisks : len ( disks ) / 2 , // to capture all quorum ratios
reportNotFound : false ,
agreed : func ( entry metaCacheEntry ) {
workers <- struct { } { }
wg . Add ( 1 )
go rebalanceEntry ( entry )
} ,
partial : func ( entries metaCacheEntries , _ [ ] error ) {
entry , ok := entries . resolve ( & resolver )
if ok {
workers <- struct { } { }
wg . Add ( 1 )
go rebalanceEntry ( * entry )
}
} ,
finished : nil ,
} )
logger . LogIf ( ctx , err )
} ( )
}
wg . Wait ( )
return nil
}
type rebalSaveOpts uint8
const (
rebalSaveStats rebalSaveOpts = iota
rebalSaveStoppedAt
)
func ( z * erasureServerPools ) saveRebalanceStats ( ctx context . Context , poolIdx int , opts rebalSaveOpts ) error {
lock := z . serverPools [ 0 ] . NewNSLock ( minioMetaBucket , rebalMetaName )
lkCtx , err := lock . GetLock ( ctx , globalOperationTimeout )
if err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "failed to acquire write lock on %s/%s: %w" , minioMetaBucket , rebalMetaName , err ) )
return err
}
2022-12-23 22:49:07 -05:00
defer lock . Unlock ( lkCtx )
2022-10-25 15:36:57 -04:00
ctx = lkCtx . Context ( )
noLockOpts := ObjectOptions { NoLock : true }
r := & rebalanceMeta { }
if err := r . loadWithOpts ( ctx , z . serverPools [ 0 ] , noLockOpts ) ; err != nil {
return err
}
z . rebalMu . Lock ( )
defer z . rebalMu . Unlock ( )
switch opts {
case rebalSaveStoppedAt :
r . StoppedAt = time . Now ( )
case rebalSaveStats :
r . PoolStats [ poolIdx ] = z . rebalMeta . PoolStats [ poolIdx ]
}
z . rebalMeta = r
err = z . rebalMeta . saveWithOpts ( ctx , z . serverPools [ 0 ] , noLockOpts )
return err
}
func auditLogRebalance ( ctx context . Context , apiName , bucket , object , versionID string , err error ) {
errStr := ""
if err != nil {
errStr = err . Error ( )
}
auditLogInternal ( ctx , AuditLogOptions {
Event : "rebalance" ,
APIName : apiName ,
Bucket : bucket ,
Object : object ,
VersionID : versionID ,
Error : errStr ,
} )
}
func ( z * erasureServerPools ) rebalanceObject ( ctx context . Context , bucket string , gr * GetObjectReader ) ( err error ) {
oi := gr . ObjInfo
defer func ( ) {
gr . Close ( )
auditLogRebalance ( ctx , "RebalanceCopyData" , oi . Bucket , oi . Name , oi . VersionID , err )
} ( )
actualSize , err := oi . GetActualSize ( )
if err != nil {
return err
}
if oi . isMultipart ( ) {
res , err := z . NewMultipartUpload ( ctx , bucket , oi . Name , ObjectOptions {
VersionID : oi . VersionID ,
MTime : oi . ModTime ,
UserDefined : oi . UserDefined ,
} )
if err != nil {
return fmt . Errorf ( "rebalanceObject: NewMultipartUpload() %w" , err )
}
defer z . AbortMultipartUpload ( ctx , bucket , oi . Name , res . UploadID , ObjectOptions { } )
parts := make ( [ ] CompletePart , len ( oi . Parts ) )
for i , part := range oi . Parts {
hr , err := hash . NewReader ( gr , part . Size , "" , "" , part . ActualSize )
if err != nil {
return fmt . Errorf ( "rebalanceObject: hash.NewReader() %w" , err )
}
pi , err := z . PutObjectPart ( ctx , bucket , oi . Name , res . UploadID ,
part . Number ,
NewPutObjReader ( hr ) ,
ObjectOptions {
PreserveETag : part . ETag , // Preserve original ETag to ensure same metadata.
IndexCB : func ( ) [ ] byte {
return part . Index // Preserve part Index to ensure decompression works.
} ,
} )
if err != nil {
return fmt . Errorf ( "rebalanceObject: PutObjectPart() %w" , err )
}
parts [ i ] = CompletePart {
ETag : pi . ETag ,
PartNumber : pi . PartNumber ,
}
}
_ , err = z . CompleteMultipartUpload ( ctx , bucket , oi . Name , res . UploadID , parts , ObjectOptions {
MTime : oi . ModTime ,
} )
if err != nil {
err = fmt . Errorf ( "rebalanceObject: CompleteMultipartUpload() %w" , err )
}
return err
}
hr , err := hash . NewReader ( gr , oi . Size , "" , "" , actualSize )
if err != nil {
return fmt . Errorf ( "rebalanceObject: hash.NewReader() %w" , err )
}
_ , err = z . PutObject ( ctx ,
bucket ,
oi . Name ,
NewPutObjReader ( hr ) ,
ObjectOptions {
VersionID : oi . VersionID ,
MTime : oi . ModTime ,
UserDefined : oi . UserDefined ,
PreserveETag : oi . ETag , // Preserve original ETag to ensure same metadata.
IndexCB : func ( ) [ ] byte {
return oi . Parts [ 0 ] . Index // Preserve part Index to ensure decompression works.
} ,
} )
if err != nil {
err = fmt . Errorf ( "rebalanceObject: PutObject() %w" , err )
}
return err
}
func ( z * erasureServerPools ) StartRebalance ( ) {
z . rebalMu . Lock ( )
if z . rebalMeta == nil || ! z . rebalMeta . StoppedAt . IsZero ( ) { // rebalance not running, nothing to do
z . rebalMu . Unlock ( )
return
}
ctx , cancel := context . WithCancel ( GlobalContext )
z . rebalMeta . cancel = cancel // to be used when rebalance-stop is called
z . rebalMu . Unlock ( )
z . rebalMu . RLock ( )
participants := make ( [ ] bool , len ( z . rebalMeta . PoolStats ) )
for i , ps := range z . rebalMeta . PoolStats {
// skip pools which have completed rebalancing
if ps . Info . Status != rebalStarted {
continue
}
participants [ i ] = ps . Participating
}
z . rebalMu . RUnlock ( )
for poolIdx , doRebalance := range participants {
if ! doRebalance {
continue
}
// nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
if ! globalEndpoints [ poolIdx ] . Endpoints [ 0 ] . IsLocal {
continue
}
go func ( idx int ) {
stopfn := globalRebalanceMetrics . log ( rebalanceMetricRebalanceBuckets , idx )
err := z . rebalanceBuckets ( ctx , idx )
stopfn ( err )
} ( poolIdx )
}
}
// StopRebalance signals the rebalance goroutine running on this node (if any)
// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
func ( z * erasureServerPools ) StopRebalance ( ) error {
z . rebalMu . Lock ( )
defer z . rebalMu . Unlock ( )
r := z . rebalMeta
if r == nil { // rebalance not running in this node, nothing to do
return nil
}
if cancel := r . cancel ; cancel != nil {
// cancel != nil only on pool leaders
r . cancel = nil
cancel ( )
}
return nil
}
// for rebalance trace support
type rebalanceMetrics struct { }
var globalRebalanceMetrics rebalanceMetrics
//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
type rebalanceMetric uint8
const (
rebalanceMetricRebalanceBuckets rebalanceMetric = iota
rebalanceMetricRebalanceBucket
rebalanceMetricRebalanceObject
rebalanceMetricRebalanceRemoveObject
rebalanceMetricSaveMetadata
)
func rebalanceTrace ( r rebalanceMetric , poolIdx int , startTime time . Time , duration time . Duration , err error , path string ) madmin . TraceInfo {
var errStr string
if err != nil {
errStr = err . Error ( )
}
return madmin . TraceInfo {
TraceType : madmin . TraceRebalance ,
Time : startTime ,
NodeName : globalLocalNodeName ,
FuncName : fmt . Sprintf ( "rebalance.%s (pool-id=%d)" , r . String ( ) , poolIdx ) ,
Duration : duration ,
Path : path ,
Error : errStr ,
}
}
func ( p * rebalanceMetrics ) log ( r rebalanceMetric , poolIdx int , paths ... string ) func ( err error ) {
startTime := time . Now ( )
return func ( err error ) {
duration := time . Since ( startTime )
if globalTrace . NumSubscribers ( madmin . TraceRebalance ) > 0 {
globalTrace . Publish ( rebalanceTrace ( r , poolIdx , startTime , duration , err , strings . Join ( paths , " " ) ) )
}
}
}