mirror of https://github.com/minio/minio.git
876 lines
24 KiB
Go
876 lines
24 KiB
Go
// Copyright (c) 2015-2022 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/lithammer/shortuuid/v4"
|
|
"github.com/minio/madmin-go/v2"
|
|
"github.com/minio/minio/internal/hash"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/env"
|
|
)
|
|
|
|
//go:generate msgp -file $GOFILE -unexported
|
|
|
|
// rebalanceStats contains per-pool rebalance statistics like number of objects,
|
|
// versions and bytes rebalanced out of a pool
|
|
type rebalanceStats struct {
|
|
InitFreeSpace uint64 `json:"initFreeSpace" msg:"ifs"` // Pool free space at the start of rebalance
|
|
InitCapacity uint64 `json:"initCapacity" msg:"ic"` // Pool capacity at the start of rebalance
|
|
|
|
Buckets []string `json:"buckets" msg:"bus"` // buckets being rebalanced or to be rebalanced
|
|
RebalancedBuckets []string `json:"rebalancedBuckets" msg:"rbs"` // buckets rebalanced
|
|
Bucket string `json:"bucket" msg:"bu"` // Last rebalanced bucket
|
|
Object string `json:"object" msg:"ob"` // Last rebalanced object
|
|
NumObjects uint64 `json:"numObjects" msg:"no"` // Number of objects rebalanced
|
|
NumVersions uint64 `json:"numVersions" msg:"nv"` // Number of versions rebalanced
|
|
Bytes uint64 `json:"bytes" msg:"bs"` // Number of bytes rebalanced
|
|
Participating bool `json:"participating" msg:"par"`
|
|
Info rebalanceInfo `json:"info" msg:"inf"`
|
|
}
|
|
|
|
func (rs *rebalanceStats) update(bucket string, fi FileInfo) {
|
|
if fi.IsLatest {
|
|
rs.NumObjects++
|
|
}
|
|
|
|
rs.NumVersions++
|
|
onDiskSz := fi.Size * int64(fi.Erasure.DataBlocks+fi.Erasure.ParityBlocks) / int64(fi.Erasure.DataBlocks)
|
|
rs.Bytes += uint64(onDiskSz)
|
|
rs.Bucket = bucket
|
|
rs.Object = fi.Name
|
|
}
|
|
|
|
type rstats []*rebalanceStats
|
|
|
|
//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
|
|
type rebalStatus uint8
|
|
|
|
const (
|
|
rebalNone rebalStatus = iota
|
|
rebalStarted
|
|
rebalCompleted
|
|
rebalStopped
|
|
rebalFailed
|
|
)
|
|
|
|
type rebalanceInfo struct {
|
|
StartTime time.Time `msg:"startTs"` // Time at which rebalance-start was issued
|
|
EndTime time.Time `msg:"stopTs"` // Time at which rebalance operation completed or rebalance-stop was called
|
|
Status rebalStatus `msg:"status"` // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
|
|
}
|
|
|
|
// rebalanceMeta contains information pertaining to an ongoing rebalance operation.
|
|
type rebalanceMeta struct {
|
|
cancel context.CancelFunc `msg:"-"` // to be invoked on rebalance-stop
|
|
lastRefreshedAt time.Time `msg:"-"`
|
|
StoppedAt time.Time `msg:"stopTs"` // Time when rebalance-stop was issued.
|
|
ID string `msg:"id"` // ID of the ongoing rebalance operation
|
|
PercentFreeGoal float64 `msg:"pf"` // Computed from total free space and capacity at the start of rebalance
|
|
PoolStats []*rebalanceStats `msg:"rss"` // Per-pool rebalance stats keyed by pool index
|
|
}
|
|
|
|
var errRebalanceNotStarted = errors.New("rebalance not started")
|
|
|
|
func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error {
|
|
r := &rebalanceMeta{}
|
|
err := r.load(ctx, z.serverPools[0])
|
|
if err != nil {
|
|
if errors.Is(err, errConfigNotFound) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
z.rebalMu.Lock()
|
|
z.rebalMeta = r
|
|
z.rebalMu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// initRebalanceMeta initializes rebalance metadata for a new rebalance
|
|
// operation and saves it in the object store.
|
|
func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) {
|
|
r := &rebalanceMeta{
|
|
ID: shortuuid.New(),
|
|
PoolStats: make([]*rebalanceStats, len(z.serverPools)),
|
|
}
|
|
|
|
// Fetch disk capacity and available space.
|
|
si := z.StorageInfo(ctx)
|
|
diskStats := make([]struct {
|
|
AvailableSpace uint64
|
|
TotalSpace uint64
|
|
}, len(z.serverPools))
|
|
var totalCap, totalFree uint64
|
|
for _, disk := range si.Disks {
|
|
// Ignore invalid.
|
|
if disk.PoolIndex < 0 || len(diskStats) <= disk.PoolIndex {
|
|
// https://github.com/minio/minio/issues/16500
|
|
continue
|
|
}
|
|
totalCap += disk.TotalSpace
|
|
totalFree += disk.AvailableSpace
|
|
|
|
diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
|
|
diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
|
|
}
|
|
r.PercentFreeGoal = float64(totalFree) / float64(totalCap)
|
|
|
|
now := time.Now()
|
|
for idx := range z.serverPools {
|
|
r.PoolStats[idx] = &rebalanceStats{
|
|
Buckets: make([]string, len(buckets)),
|
|
RebalancedBuckets: make([]string, 0, len(buckets)),
|
|
InitFreeSpace: diskStats[idx].AvailableSpace,
|
|
InitCapacity: diskStats[idx].TotalSpace,
|
|
}
|
|
copy(r.PoolStats[idx].Buckets, buckets)
|
|
|
|
if pfi := float64(diskStats[idx].AvailableSpace) / float64(diskStats[idx].TotalSpace); pfi < r.PercentFreeGoal {
|
|
r.PoolStats[idx].Participating = true
|
|
r.PoolStats[idx].Info = rebalanceInfo{
|
|
StartTime: now,
|
|
Status: rebalStarted,
|
|
}
|
|
}
|
|
}
|
|
|
|
err = r.save(ctx, z.serverPools[0])
|
|
if err != nil {
|
|
return arn, err
|
|
}
|
|
|
|
z.rebalMeta = r
|
|
return r.ID, nil
|
|
}
|
|
|
|
func (z *erasureServerPools) updatePoolStats(poolIdx int, bucket string, fi FileInfo) {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.PoolStats[poolIdx].update(bucket, fi)
|
|
}
|
|
|
|
const (
|
|
rebalMetaName = "rebalance.bin"
|
|
rebalMetaFmt = 1
|
|
rebalMetaVer = 1
|
|
)
|
|
|
|
func (z *erasureServerPools) nextRebalBucket(poolIdx int) (string, bool) {
|
|
z.rebalMu.RLock()
|
|
defer z.rebalMu.RUnlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil {
|
|
return "", false
|
|
}
|
|
|
|
ps := r.PoolStats[poolIdx]
|
|
if ps == nil {
|
|
return "", false
|
|
}
|
|
|
|
if ps.Info.Status == rebalCompleted || !ps.Participating {
|
|
return "", false
|
|
}
|
|
|
|
if len(ps.Buckets) == 0 {
|
|
return "", false
|
|
}
|
|
|
|
return ps.Buckets[0], true
|
|
}
|
|
|
|
func (z *erasureServerPools) bucketRebalanceDone(bucket string, poolIdx int) {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
ps := z.rebalMeta.PoolStats[poolIdx]
|
|
if ps == nil {
|
|
return
|
|
}
|
|
|
|
for i, b := range ps.Buckets {
|
|
if b == bucket {
|
|
ps.Buckets = append(ps.Buckets[:i], ps.Buckets[i+1:]...)
|
|
ps.RebalancedBuckets = append(ps.RebalancedBuckets, bucket)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *rebalanceMeta) load(ctx context.Context, store objectIO) error {
|
|
return r.loadWithOpts(ctx, store, ObjectOptions{})
|
|
}
|
|
|
|
func (r *rebalanceMeta) loadWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
|
|
data, _, err := readConfigWithMetadata(ctx, store, rebalMetaName, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(data) == 0 {
|
|
return nil
|
|
}
|
|
if len(data) <= 4 {
|
|
return fmt.Errorf("rebalanceMeta: no data")
|
|
}
|
|
|
|
// Read header
|
|
switch binary.LittleEndian.Uint16(data[0:2]) {
|
|
case rebalMetaFmt:
|
|
default:
|
|
return fmt.Errorf("rebalanceMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
|
|
}
|
|
switch binary.LittleEndian.Uint16(data[2:4]) {
|
|
case rebalMetaVer:
|
|
default:
|
|
return fmt.Errorf("rebalanceMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
|
|
}
|
|
|
|
// OK, parse data.
|
|
if _, err = r.UnmarshalMsg(data[4:]); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.lastRefreshedAt = time.Now()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *rebalanceMeta) saveWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
|
|
data := make([]byte, 4, r.Msgsize()+4)
|
|
|
|
// Initialize the header.
|
|
binary.LittleEndian.PutUint16(data[0:2], rebalMetaFmt)
|
|
binary.LittleEndian.PutUint16(data[2:4], rebalMetaVer)
|
|
|
|
buf, err := r.MarshalMsg(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return saveConfigWithOpts(ctx, store, rebalMetaName, buf, opts)
|
|
}
|
|
|
|
func (r *rebalanceMeta) save(ctx context.Context, store objectIO) error {
|
|
return r.saveWithOpts(ctx, store, ObjectOptions{})
|
|
}
|
|
|
|
func (z *erasureServerPools) IsRebalanceStarted() bool {
|
|
z.rebalMu.RLock()
|
|
defer z.rebalMu.RUnlock()
|
|
|
|
if r := z.rebalMeta; r != nil {
|
|
if r.StoppedAt.IsZero() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
|
|
z.rebalMu.RLock()
|
|
defer z.rebalMu.RUnlock()
|
|
|
|
if r := z.rebalMeta; r != nil {
|
|
if !r.StoppedAt.IsZero() {
|
|
return false
|
|
}
|
|
ps := z.rebalMeta.PoolStats[poolIndex]
|
|
return ps.Participating && ps.Info.Status == rebalStarted
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
|
|
doneCh := make(chan struct{})
|
|
defer close(doneCh)
|
|
|
|
// Save rebalance.bin periodically.
|
|
go func() {
|
|
// Update rebalance.bin periodically once every 5-10s, chosen randomly
|
|
// to avoid multiple pool leaders herding to update around the same
|
|
// time.
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
randSleepFor := func() time.Duration {
|
|
return 5*time.Second + time.Duration(float64(5*time.Second)*r.Float64())
|
|
}
|
|
|
|
timer := time.NewTimer(randSleepFor())
|
|
defer timer.Stop()
|
|
var rebalDone bool
|
|
var traceMsg string
|
|
|
|
for {
|
|
select {
|
|
case <-doneCh:
|
|
// rebalance completed for poolIdx
|
|
now := time.Now()
|
|
z.rebalMu.Lock()
|
|
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalCompleted
|
|
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
|
|
z.rebalMu.Unlock()
|
|
|
|
rebalDone = true
|
|
traceMsg = fmt.Sprintf("completed at %s", now)
|
|
|
|
case <-ctx.Done():
|
|
|
|
// rebalance stopped for poolIdx
|
|
now := time.Now()
|
|
z.rebalMu.Lock()
|
|
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalStopped
|
|
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
|
|
z.rebalMeta.cancel = nil // remove the already used context.CancelFunc
|
|
z.rebalMu.Unlock()
|
|
|
|
rebalDone = true
|
|
traceMsg = fmt.Sprintf("stopped at %s", now)
|
|
|
|
case <-timer.C:
|
|
traceMsg = fmt.Sprintf("saved at %s", time.Now())
|
|
}
|
|
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
|
|
err := z.saveRebalanceStats(ctx, poolIdx, rebalSaveStats)
|
|
stopFn(err)
|
|
logger.LogIf(ctx, err)
|
|
timer.Reset(randSleepFor())
|
|
|
|
if rebalDone {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
bucket, ok := z.nextRebalBucket(poolIdx)
|
|
if !ok {
|
|
// no more buckets to rebalance or target free_space/capacity reached
|
|
break
|
|
}
|
|
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
|
|
err = z.rebalanceBucket(ctx, bucket, poolIdx)
|
|
if err != nil {
|
|
stopFn(err)
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
stopFn(nil)
|
|
z.bucketRebalanceDone(bucket, poolIdx)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
// check if enough objects have been rebalanced
|
|
r := z.rebalMeta
|
|
poolStats := r.PoolStats[poolIdx]
|
|
if poolStats.Info.Status == rebalCompleted {
|
|
return true
|
|
}
|
|
|
|
pfi := float64(poolStats.InitFreeSpace+poolStats.Bytes) / float64(poolStats.InitCapacity)
|
|
// Mark pool rebalance as done if within 5% from PercentFreeGoal.
|
|
if diff := math.Abs(pfi - r.PercentFreeGoal); diff <= 0.05 {
|
|
r.PoolStats[poolIdx].Info.Status = rebalCompleted
|
|
r.PoolStats[poolIdx].Info.EndTime = time.Now()
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// rebalanceBucket rebalances objects under bucket in poolIdx pool
|
|
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error {
|
|
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
|
vc, _ := globalBucketVersioningSys.Get(bucket)
|
|
// Check if the current bucket has a configured lifecycle policy
|
|
lc, _ := globalLifecycleSys.Get(bucket)
|
|
// Check if bucket is object locked.
|
|
lr, _ := globalBucketObjectLockSys.Get(bucket)
|
|
|
|
pool := z.serverPools[poolIdx]
|
|
const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
|
|
wStr := env.Get(envRebalanceWorkers, strconv.Itoa(len(pool.sets)))
|
|
workerSize, err := strconv.Atoi(wStr)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("invalid %s value: %s err: %v, defaulting to %d", envRebalanceWorkers, wStr, err, len(pool.sets)))
|
|
workerSize = len(pool.sets)
|
|
}
|
|
workers := make(chan struct{}, workerSize)
|
|
var wg sync.WaitGroup
|
|
for _, set := range pool.sets {
|
|
set := set
|
|
disks := set.getOnlineDisks()
|
|
if len(disks) == 0 {
|
|
logger.LogIf(ctx, fmt.Errorf("no online disks found for set with endpoints %s",
|
|
set.getEndpoints()))
|
|
continue
|
|
}
|
|
|
|
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
|
|
if lc == nil {
|
|
return false
|
|
}
|
|
versioned := vc != nil && vc.Versioned(object)
|
|
objInfo := fi.ToObjectInfo(bucket, object, versioned)
|
|
|
|
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
|
|
if evt.Action.Delete() {
|
|
globalExpiryState.enqueueByDays(objInfo, evt)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
rebalanceEntry := func(entry metaCacheEntry) {
|
|
defer func() {
|
|
<-workers
|
|
wg.Done()
|
|
}()
|
|
|
|
if entry.isDir() {
|
|
return
|
|
}
|
|
|
|
// rebalance on poolIdx has reached its goal
|
|
if z.checkIfRebalanceDone(poolIdx) {
|
|
return
|
|
}
|
|
|
|
fivs, err := entry.fileInfoVersions(bucket)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// We need a reversed order for rebalance,
|
|
// to create the appropriate stack.
|
|
versionsSorter(fivs.Versions).reverse()
|
|
|
|
var rebalanced int
|
|
for _, version := range fivs.Versions {
|
|
// Skip transitioned objects for now. TBD
|
|
if version.IsRemote() {
|
|
continue
|
|
}
|
|
|
|
// Apply lifecycle rules on the objects that are expired.
|
|
if filterLifecycle(bucket, version.Name, version) {
|
|
logger.LogIf(ctx, fmt.Errorf("found %s/%s (%s) expired object based on ILM rules, skipping and scheduled for deletion", bucket, version.Name, version.VersionID))
|
|
continue
|
|
}
|
|
|
|
// We will skip rebalancing delete markers
|
|
// with single version, its as good as there
|
|
// is no data associated with the object.
|
|
if version.Deleted && len(fivs.Versions) == 1 {
|
|
logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no data to be rebalanced", bucket, version.Name))
|
|
continue
|
|
}
|
|
|
|
if version.Deleted {
|
|
_, err := z.DeleteObject(ctx,
|
|
bucket,
|
|
version.Name,
|
|
ObjectOptions{
|
|
Versioned: vc.PrefixEnabled(version.Name),
|
|
VersionID: version.VersionID,
|
|
MTime: version.ModTime,
|
|
DeleteReplication: version.ReplicationState,
|
|
DeleteMarker: true, // make sure we create a delete marker
|
|
SkipRebalancing: true, // make sure we skip the decommissioned pool
|
|
})
|
|
var failure bool
|
|
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
failure = true
|
|
}
|
|
|
|
if !failure {
|
|
z.updatePoolStats(poolIdx, bucket, version)
|
|
rebalanced++
|
|
}
|
|
continue
|
|
}
|
|
|
|
var failure bool
|
|
for try := 0; try < 3; try++ {
|
|
// GetObjectReader.Close is called by rebalanceObject
|
|
gr, err := set.GetObjectNInfo(ctx,
|
|
bucket,
|
|
encodeDirObject(version.Name),
|
|
nil,
|
|
http.Header{},
|
|
ObjectOptions{
|
|
VersionID: version.VersionID,
|
|
NoDecryption: true,
|
|
NoLock: true,
|
|
})
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
// object deleted by the application, nothing to do here we move on.
|
|
return
|
|
}
|
|
if err != nil {
|
|
failure = true
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name)
|
|
if err = z.rebalanceObject(ctx, bucket, gr); err != nil {
|
|
stopFn(err)
|
|
failure = true
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
|
|
stopFn(nil)
|
|
failure = false
|
|
break
|
|
}
|
|
|
|
if failure {
|
|
break // break out on first error
|
|
}
|
|
z.updatePoolStats(poolIdx, bucket, version)
|
|
rebalanced++
|
|
}
|
|
|
|
// if all versions were rebalanced, we can delete the object versions.
|
|
if rebalanced == len(fivs.Versions) {
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceRemoveObject, poolIdx, bucket, entry.name)
|
|
_, err := set.DeleteObject(ctx,
|
|
bucket,
|
|
encodeDirObject(entry.name),
|
|
ObjectOptions{
|
|
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
|
},
|
|
)
|
|
stopFn(err)
|
|
auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
// How to resolve partial results.
|
|
resolver := metadataResolutionParams{
|
|
dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
|
objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
|
bucket: bucket,
|
|
}
|
|
err := listPathRaw(ctx, listPathRawOptions{
|
|
disks: disks,
|
|
bucket: bucket,
|
|
recursive: true,
|
|
forwardTo: "",
|
|
minDisks: len(disks) / 2, // to capture all quorum ratios
|
|
reportNotFound: false,
|
|
agreed: func(entry metaCacheEntry) {
|
|
workers <- struct{}{}
|
|
wg.Add(1)
|
|
go rebalanceEntry(entry)
|
|
},
|
|
partial: func(entries metaCacheEntries, _ []error) {
|
|
entry, ok := entries.resolve(&resolver)
|
|
if ok {
|
|
workers <- struct{}{}
|
|
wg.Add(1)
|
|
go rebalanceEntry(*entry)
|
|
}
|
|
},
|
|
finished: nil,
|
|
})
|
|
logger.LogIf(ctx, err)
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
type rebalSaveOpts uint8
|
|
|
|
const (
|
|
rebalSaveStats rebalSaveOpts = iota
|
|
rebalSaveStoppedAt
|
|
)
|
|
|
|
func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int, opts rebalSaveOpts) error {
|
|
lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
|
|
lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
|
|
return err
|
|
}
|
|
defer lock.Unlock(lkCtx)
|
|
|
|
ctx = lkCtx.Context()
|
|
noLockOpts := ObjectOptions{NoLock: true}
|
|
r := &rebalanceMeta{}
|
|
if err := r.loadWithOpts(ctx, z.serverPools[0], noLockOpts); err != nil {
|
|
return err
|
|
}
|
|
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
switch opts {
|
|
case rebalSaveStoppedAt:
|
|
r.StoppedAt = time.Now()
|
|
case rebalSaveStats:
|
|
r.PoolStats[poolIdx] = z.rebalMeta.PoolStats[poolIdx]
|
|
}
|
|
z.rebalMeta = r
|
|
|
|
err = z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
|
|
return err
|
|
}
|
|
|
|
func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) {
|
|
errStr := ""
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
auditLogInternal(ctx, AuditLogOptions{
|
|
Event: "rebalance",
|
|
APIName: apiName,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
VersionID: versionID,
|
|
Error: errStr,
|
|
})
|
|
}
|
|
|
|
func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
|
|
oi := gr.ObjInfo
|
|
|
|
defer func() {
|
|
gr.Close()
|
|
auditLogRebalance(ctx, "RebalanceCopyData", oi.Bucket, oi.Name, oi.VersionID, err)
|
|
}()
|
|
|
|
actualSize, err := oi.GetActualSize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if oi.isMultipart() {
|
|
res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
|
|
VersionID: oi.VersionID,
|
|
MTime: oi.ModTime,
|
|
UserDefined: oi.UserDefined,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
|
|
}
|
|
defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{})
|
|
|
|
parts := make([]CompletePart, len(oi.Parts))
|
|
for i, part := range oi.Parts {
|
|
hr, err := hash.NewLimitReader(gr, part.Size, "", "", part.ActualSize)
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: hash.NewLimitReader() %w", err)
|
|
}
|
|
pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID,
|
|
part.Number,
|
|
NewPutObjReader(hr),
|
|
ObjectOptions{
|
|
PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
|
|
IndexCB: func() []byte {
|
|
return part.Index // Preserve part Index to ensure decompression works.
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
|
|
}
|
|
parts[i] = CompletePart{
|
|
ETag: pi.ETag,
|
|
PartNumber: pi.PartNumber,
|
|
}
|
|
}
|
|
_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
|
|
MTime: oi.ModTime,
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
hr, err := hash.NewReader(gr, oi.Size, "", "", actualSize)
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
|
|
}
|
|
_, err = z.PutObject(ctx,
|
|
bucket,
|
|
oi.Name,
|
|
NewPutObjReader(hr),
|
|
ObjectOptions{
|
|
VersionID: oi.VersionID,
|
|
MTime: oi.ModTime,
|
|
UserDefined: oi.UserDefined,
|
|
PreserveETag: oi.ETag, // Preserve original ETag to ensure same metadata.
|
|
IndexCB: func() []byte {
|
|
return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
|
|
},
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (z *erasureServerPools) StartRebalance() {
|
|
z.rebalMu.Lock()
|
|
if z.rebalMeta == nil || !z.rebalMeta.StoppedAt.IsZero() { // rebalance not running, nothing to do
|
|
z.rebalMu.Unlock()
|
|
return
|
|
}
|
|
ctx, cancel := context.WithCancel(GlobalContext)
|
|
z.rebalMeta.cancel = cancel // to be used when rebalance-stop is called
|
|
z.rebalMu.Unlock()
|
|
|
|
z.rebalMu.RLock()
|
|
participants := make([]bool, len(z.rebalMeta.PoolStats))
|
|
for i, ps := range z.rebalMeta.PoolStats {
|
|
// skip pools which have completed rebalancing
|
|
if ps.Info.Status != rebalStarted {
|
|
continue
|
|
}
|
|
|
|
participants[i] = ps.Participating
|
|
}
|
|
z.rebalMu.RUnlock()
|
|
|
|
for poolIdx, doRebalance := range participants {
|
|
if !doRebalance {
|
|
continue
|
|
}
|
|
// nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
|
|
if !globalEndpoints[poolIdx].Endpoints[0].IsLocal {
|
|
continue
|
|
}
|
|
|
|
go func(idx int) {
|
|
stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
|
|
err := z.rebalanceBuckets(ctx, idx)
|
|
stopfn(err)
|
|
}(poolIdx)
|
|
}
|
|
}
|
|
|
|
// StopRebalance signals the rebalance goroutine running on this node (if any)
|
|
// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
|
|
func (z *erasureServerPools) StopRebalance() error {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil { // rebalance not running in this node, nothing to do
|
|
return nil
|
|
}
|
|
|
|
if cancel := r.cancel; cancel != nil {
|
|
// cancel != nil only on pool leaders
|
|
r.cancel = nil
|
|
cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// for rebalance trace support
|
|
type rebalanceMetrics struct{}
|
|
|
|
var globalRebalanceMetrics rebalanceMetrics
|
|
|
|
//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
|
|
type rebalanceMetric uint8
|
|
|
|
const (
|
|
rebalanceMetricRebalanceBuckets rebalanceMetric = iota
|
|
rebalanceMetricRebalanceBucket
|
|
rebalanceMetricRebalanceObject
|
|
rebalanceMetricRebalanceRemoveObject
|
|
rebalanceMetricSaveMetadata
|
|
)
|
|
|
|
func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string) madmin.TraceInfo {
|
|
var errStr string
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
return madmin.TraceInfo{
|
|
TraceType: madmin.TraceRebalance,
|
|
Time: startTime,
|
|
NodeName: globalLocalNodeName,
|
|
FuncName: fmt.Sprintf("rebalance.%s (pool-id=%d)", r.String(), poolIdx),
|
|
Duration: duration,
|
|
Path: path,
|
|
Error: errStr,
|
|
}
|
|
}
|
|
|
|
func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(err error) {
|
|
startTime := time.Now()
|
|
return func(err error) {
|
|
duration := time.Since(startTime)
|
|
if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
|
|
globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " ")))
|
|
}
|
|
}
|
|
}
|