mirror of
https://github.com/minio/minio.git
synced 2024-12-26 07:05:55 -05:00
9af6c6ceef
A continuation of PR #17479 for rebalance behavior must also match the decommission behavior. Fixes bug where rebalance would ignore rebalancing object versions after one of the version returned "ObjectNotFound"
891 lines
24 KiB
Go
891 lines
24 KiB
Go
// Copyright (c) 2015-2022 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/lithammer/shortuuid/v4"
|
|
"github.com/minio/madmin-go/v3"
|
|
"github.com/minio/minio/internal/hash"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/env"
|
|
)
|
|
|
|
//go:generate msgp -file $GOFILE -unexported
|
|
|
|
// rebalanceStats contains per-pool rebalance statistics like number of objects,
|
|
// versions and bytes rebalanced out of a pool
|
|
type rebalanceStats struct {
|
|
InitFreeSpace uint64 `json:"initFreeSpace" msg:"ifs"` // Pool free space at the start of rebalance
|
|
InitCapacity uint64 `json:"initCapacity" msg:"ic"` // Pool capacity at the start of rebalance
|
|
|
|
Buckets []string `json:"buckets" msg:"bus"` // buckets being rebalanced or to be rebalanced
|
|
RebalancedBuckets []string `json:"rebalancedBuckets" msg:"rbs"` // buckets rebalanced
|
|
Bucket string `json:"bucket" msg:"bu"` // Last rebalanced bucket
|
|
Object string `json:"object" msg:"ob"` // Last rebalanced object
|
|
NumObjects uint64 `json:"numObjects" msg:"no"` // Number of objects rebalanced
|
|
NumVersions uint64 `json:"numVersions" msg:"nv"` // Number of versions rebalanced
|
|
Bytes uint64 `json:"bytes" msg:"bs"` // Number of bytes rebalanced
|
|
Participating bool `json:"participating" msg:"par"`
|
|
Info rebalanceInfo `json:"info" msg:"inf"`
|
|
}
|
|
|
|
func (rs *rebalanceStats) update(bucket string, fi FileInfo) {
|
|
if fi.IsLatest {
|
|
rs.NumObjects++
|
|
}
|
|
|
|
rs.NumVersions++
|
|
onDiskSz := int64(0)
|
|
if !fi.Deleted {
|
|
onDiskSz = fi.Size * int64(fi.Erasure.DataBlocks+fi.Erasure.ParityBlocks) / int64(fi.Erasure.DataBlocks)
|
|
}
|
|
rs.Bytes += uint64(onDiskSz)
|
|
rs.Bucket = bucket
|
|
rs.Object = fi.Name
|
|
}
|
|
|
|
type rstats []*rebalanceStats
|
|
|
|
//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
|
|
type rebalStatus uint8
|
|
|
|
const (
|
|
rebalNone rebalStatus = iota
|
|
rebalStarted
|
|
rebalCompleted
|
|
rebalStopped
|
|
rebalFailed
|
|
)
|
|
|
|
type rebalanceInfo struct {
|
|
StartTime time.Time `msg:"startTs"` // Time at which rebalance-start was issued
|
|
EndTime time.Time `msg:"stopTs"` // Time at which rebalance operation completed or rebalance-stop was called
|
|
Status rebalStatus `msg:"status"` // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
|
|
}
|
|
|
|
// rebalanceMeta contains information pertaining to an ongoing rebalance operation.
|
|
type rebalanceMeta struct {
|
|
cancel context.CancelFunc `msg:"-"` // to be invoked on rebalance-stop
|
|
lastRefreshedAt time.Time `msg:"-"`
|
|
StoppedAt time.Time `msg:"stopTs"` // Time when rebalance-stop was issued.
|
|
ID string `msg:"id"` // ID of the ongoing rebalance operation
|
|
PercentFreeGoal float64 `msg:"pf"` // Computed from total free space and capacity at the start of rebalance
|
|
PoolStats []*rebalanceStats `msg:"rss"` // Per-pool rebalance stats keyed by pool index
|
|
}
|
|
|
|
var errRebalanceNotStarted = errors.New("rebalance not started")
|
|
|
|
func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error {
|
|
r := &rebalanceMeta{}
|
|
err := r.load(ctx, z.serverPools[0])
|
|
if err != nil {
|
|
if errors.Is(err, errConfigNotFound) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
z.rebalMu.Lock()
|
|
z.rebalMeta = r
|
|
z.rebalMu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// initRebalanceMeta initializes rebalance metadata for a new rebalance
|
|
// operation and saves it in the object store.
|
|
func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) {
|
|
r := &rebalanceMeta{
|
|
ID: shortuuid.New(),
|
|
PoolStats: make([]*rebalanceStats, len(z.serverPools)),
|
|
}
|
|
|
|
// Fetch disk capacity and available space.
|
|
si := z.StorageInfo(ctx)
|
|
diskStats := make([]struct {
|
|
AvailableSpace uint64
|
|
TotalSpace uint64
|
|
}, len(z.serverPools))
|
|
var totalCap, totalFree uint64
|
|
for _, disk := range si.Disks {
|
|
// Ignore invalid.
|
|
if disk.PoolIndex < 0 || len(diskStats) <= disk.PoolIndex {
|
|
// https://github.com/minio/minio/issues/16500
|
|
continue
|
|
}
|
|
totalCap += disk.TotalSpace
|
|
totalFree += disk.AvailableSpace
|
|
|
|
diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
|
|
diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
|
|
}
|
|
r.PercentFreeGoal = float64(totalFree) / float64(totalCap)
|
|
|
|
now := time.Now()
|
|
for idx := range z.serverPools {
|
|
r.PoolStats[idx] = &rebalanceStats{
|
|
Buckets: make([]string, len(buckets)),
|
|
RebalancedBuckets: make([]string, 0, len(buckets)),
|
|
InitFreeSpace: diskStats[idx].AvailableSpace,
|
|
InitCapacity: diskStats[idx].TotalSpace,
|
|
}
|
|
copy(r.PoolStats[idx].Buckets, buckets)
|
|
|
|
if pfi := float64(diskStats[idx].AvailableSpace) / float64(diskStats[idx].TotalSpace); pfi < r.PercentFreeGoal {
|
|
r.PoolStats[idx].Participating = true
|
|
r.PoolStats[idx].Info = rebalanceInfo{
|
|
StartTime: now,
|
|
Status: rebalStarted,
|
|
}
|
|
}
|
|
}
|
|
|
|
err = r.save(ctx, z.serverPools[0])
|
|
if err != nil {
|
|
return arn, err
|
|
}
|
|
|
|
z.rebalMeta = r
|
|
return r.ID, nil
|
|
}
|
|
|
|
func (z *erasureServerPools) updatePoolStats(poolIdx int, bucket string, fi FileInfo) {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.PoolStats[poolIdx].update(bucket, fi)
|
|
}
|
|
|
|
const (
|
|
rebalMetaName = "rebalance.bin"
|
|
rebalMetaFmt = 1
|
|
rebalMetaVer = 1
|
|
)
|
|
|
|
func (z *erasureServerPools) nextRebalBucket(poolIdx int) (string, bool) {
|
|
z.rebalMu.RLock()
|
|
defer z.rebalMu.RUnlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil {
|
|
return "", false
|
|
}
|
|
|
|
ps := r.PoolStats[poolIdx]
|
|
if ps == nil {
|
|
return "", false
|
|
}
|
|
|
|
if ps.Info.Status == rebalCompleted || !ps.Participating {
|
|
return "", false
|
|
}
|
|
|
|
if len(ps.Buckets) == 0 {
|
|
return "", false
|
|
}
|
|
|
|
return ps.Buckets[0], true
|
|
}
|
|
|
|
func (z *erasureServerPools) bucketRebalanceDone(bucket string, poolIdx int) {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
ps := z.rebalMeta.PoolStats[poolIdx]
|
|
if ps == nil {
|
|
return
|
|
}
|
|
|
|
for i, b := range ps.Buckets {
|
|
if b == bucket {
|
|
ps.Buckets = append(ps.Buckets[:i], ps.Buckets[i+1:]...)
|
|
ps.RebalancedBuckets = append(ps.RebalancedBuckets, bucket)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *rebalanceMeta) load(ctx context.Context, store objectIO) error {
|
|
return r.loadWithOpts(ctx, store, ObjectOptions{})
|
|
}
|
|
|
|
func (r *rebalanceMeta) loadWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
|
|
data, _, err := readConfigWithMetadata(ctx, store, rebalMetaName, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(data) == 0 {
|
|
return nil
|
|
}
|
|
if len(data) <= 4 {
|
|
return fmt.Errorf("rebalanceMeta: no data")
|
|
}
|
|
|
|
// Read header
|
|
switch binary.LittleEndian.Uint16(data[0:2]) {
|
|
case rebalMetaFmt:
|
|
default:
|
|
return fmt.Errorf("rebalanceMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
|
|
}
|
|
switch binary.LittleEndian.Uint16(data[2:4]) {
|
|
case rebalMetaVer:
|
|
default:
|
|
return fmt.Errorf("rebalanceMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
|
|
}
|
|
|
|
// OK, parse data.
|
|
if _, err = r.UnmarshalMsg(data[4:]); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.lastRefreshedAt = time.Now()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *rebalanceMeta) saveWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
|
|
data := make([]byte, 4, r.Msgsize()+4)
|
|
|
|
// Initialize the header.
|
|
binary.LittleEndian.PutUint16(data[0:2], rebalMetaFmt)
|
|
binary.LittleEndian.PutUint16(data[2:4], rebalMetaVer)
|
|
|
|
buf, err := r.MarshalMsg(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return saveConfigWithOpts(ctx, store, rebalMetaName, buf, opts)
|
|
}
|
|
|
|
func (r *rebalanceMeta) save(ctx context.Context, store objectIO) error {
|
|
return r.saveWithOpts(ctx, store, ObjectOptions{})
|
|
}
|
|
|
|
func (z *erasureServerPools) IsRebalanceStarted() bool {
|
|
z.rebalMu.RLock()
|
|
defer z.rebalMu.RUnlock()
|
|
|
|
if r := z.rebalMeta; r != nil {
|
|
if r.StoppedAt.IsZero() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
|
|
z.rebalMu.RLock()
|
|
defer z.rebalMu.RUnlock()
|
|
|
|
if r := z.rebalMeta; r != nil {
|
|
if !r.StoppedAt.IsZero() {
|
|
return false
|
|
}
|
|
ps := z.rebalMeta.PoolStats[poolIndex]
|
|
return ps.Participating && ps.Info.Status == rebalStarted
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
|
|
doneCh := make(chan struct{})
|
|
defer close(doneCh)
|
|
|
|
// Save rebalance.bin periodically.
|
|
go func() {
|
|
// Update rebalance.bin periodically once every 5-10s, chosen randomly
|
|
// to avoid multiple pool leaders herding to update around the same
|
|
// time.
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
randSleepFor := func() time.Duration {
|
|
return 5*time.Second + time.Duration(float64(5*time.Second)*r.Float64())
|
|
}
|
|
|
|
timer := time.NewTimer(randSleepFor())
|
|
defer timer.Stop()
|
|
var rebalDone bool
|
|
var traceMsg string
|
|
|
|
for {
|
|
select {
|
|
case <-doneCh:
|
|
// rebalance completed for poolIdx
|
|
now := time.Now()
|
|
z.rebalMu.Lock()
|
|
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalCompleted
|
|
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
|
|
z.rebalMu.Unlock()
|
|
|
|
rebalDone = true
|
|
traceMsg = fmt.Sprintf("completed at %s", now)
|
|
|
|
case <-ctx.Done():
|
|
|
|
// rebalance stopped for poolIdx
|
|
now := time.Now()
|
|
z.rebalMu.Lock()
|
|
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalStopped
|
|
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
|
|
z.rebalMeta.cancel = nil // remove the already used context.CancelFunc
|
|
z.rebalMu.Unlock()
|
|
|
|
rebalDone = true
|
|
traceMsg = fmt.Sprintf("stopped at %s", now)
|
|
|
|
case <-timer.C:
|
|
traceMsg = fmt.Sprintf("saved at %s", time.Now())
|
|
}
|
|
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
|
|
err := z.saveRebalanceStats(ctx, poolIdx, rebalSaveStats)
|
|
stopFn(err)
|
|
logger.LogIf(ctx, err)
|
|
timer.Reset(randSleepFor())
|
|
|
|
if rebalDone {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
bucket, ok := z.nextRebalBucket(poolIdx)
|
|
if !ok {
|
|
// no more buckets to rebalance or target free_space/capacity reached
|
|
break
|
|
}
|
|
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
|
|
err = z.rebalanceBucket(ctx, bucket, poolIdx)
|
|
if err != nil {
|
|
stopFn(err)
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
stopFn(nil)
|
|
z.bucketRebalanceDone(bucket, poolIdx)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
// check if enough objects have been rebalanced
|
|
r := z.rebalMeta
|
|
poolStats := r.PoolStats[poolIdx]
|
|
if poolStats.Info.Status == rebalCompleted {
|
|
return true
|
|
}
|
|
|
|
pfi := float64(poolStats.InitFreeSpace+poolStats.Bytes) / float64(poolStats.InitCapacity)
|
|
// Mark pool rebalance as done if within 5% from PercentFreeGoal.
|
|
if diff := math.Abs(pfi - r.PercentFreeGoal); diff <= 0.05 {
|
|
r.PoolStats[poolIdx].Info.Status = rebalCompleted
|
|
r.PoolStats[poolIdx].Info.EndTime = time.Now()
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// rebalanceBucket rebalances objects under bucket in poolIdx pool
|
|
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error {
|
|
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
|
vc, _ := globalBucketVersioningSys.Get(bucket)
|
|
// Check if the current bucket has a configured lifecycle policy
|
|
lc, _ := globalLifecycleSys.Get(bucket)
|
|
// Check if bucket is object locked.
|
|
lr, _ := globalBucketObjectLockSys.Get(bucket)
|
|
|
|
pool := z.serverPools[poolIdx]
|
|
const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
|
|
wStr := env.Get(envRebalanceWorkers, strconv.Itoa(len(pool.sets)))
|
|
workerSize, err := strconv.Atoi(wStr)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("invalid %s value: %s err: %v, defaulting to %d", envRebalanceWorkers, wStr, err, len(pool.sets)))
|
|
workerSize = len(pool.sets)
|
|
}
|
|
workers := make(chan struct{}, workerSize)
|
|
var wg sync.WaitGroup
|
|
for _, set := range pool.sets {
|
|
set := set
|
|
disks := set.getOnlineDisks()
|
|
if len(disks) == 0 {
|
|
logger.LogIf(ctx, fmt.Errorf("no online disks found for set with endpoints %s",
|
|
set.getEndpoints()))
|
|
continue
|
|
}
|
|
|
|
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
|
|
if lc == nil {
|
|
return false
|
|
}
|
|
versioned := vc != nil && vc.Versioned(object)
|
|
objInfo := fi.ToObjectInfo(bucket, object, versioned)
|
|
|
|
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
|
|
if evt.Action.Delete() {
|
|
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_Rebal)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
rebalanceEntry := func(entry metaCacheEntry) {
|
|
defer func() {
|
|
<-workers
|
|
wg.Done()
|
|
}()
|
|
|
|
if entry.isDir() {
|
|
return
|
|
}
|
|
|
|
// rebalance on poolIdx has reached its goal
|
|
if z.checkIfRebalanceDone(poolIdx) {
|
|
return
|
|
}
|
|
|
|
fivs, err := entry.fileInfoVersions(bucket)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// We need a reversed order for rebalance,
|
|
// to create the appropriate stack.
|
|
versionsSorter(fivs.Versions).reverse()
|
|
|
|
var rebalanced, expired int
|
|
for _, version := range fivs.Versions {
|
|
// Skip transitioned objects for now. TBD
|
|
if version.IsRemote() {
|
|
continue
|
|
}
|
|
|
|
// Apply lifecycle rules on the objects that are expired.
|
|
if filterLifecycle(bucket, version.Name, version) {
|
|
rebalanced++
|
|
expired++
|
|
continue
|
|
}
|
|
|
|
// any object with only single DEL marker we don't need
|
|
// to rebalance, just skip it, this also includes
|
|
// any other versions that have already expired.
|
|
remainingVersions := len(fivs.Versions) - expired
|
|
if version.Deleted && remainingVersions == 1 {
|
|
rebalanced++
|
|
continue
|
|
}
|
|
|
|
versionID := version.VersionID
|
|
if versionID == "" {
|
|
versionID = nullVersionID
|
|
}
|
|
|
|
if version.Deleted {
|
|
_, err := z.DeleteObject(ctx,
|
|
bucket,
|
|
version.Name,
|
|
ObjectOptions{
|
|
Versioned: true,
|
|
VersionID: versionID,
|
|
MTime: version.ModTime,
|
|
DeleteReplication: version.ReplicationState,
|
|
DeleteMarker: true, // make sure we create a delete marker
|
|
SkipRebalancing: true, // make sure we skip the decommissioned pool
|
|
})
|
|
var failure bool
|
|
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
failure = true
|
|
}
|
|
|
|
if !failure {
|
|
z.updatePoolStats(poolIdx, bucket, version)
|
|
rebalanced++
|
|
}
|
|
continue
|
|
}
|
|
|
|
var failure, ignore bool
|
|
for try := 0; try < 3; try++ {
|
|
// GetObjectReader.Close is called by rebalanceObject
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID)
|
|
gr, err := set.GetObjectNInfo(ctx,
|
|
bucket,
|
|
encodeDirObject(version.Name),
|
|
nil,
|
|
http.Header{},
|
|
ObjectOptions{
|
|
VersionID: versionID,
|
|
NoDecryption: true,
|
|
NoLock: true,
|
|
})
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
// object deleted by the application, nothing to do here we move on.
|
|
ignore = true
|
|
stopFn(nil)
|
|
break
|
|
}
|
|
if err != nil {
|
|
failure = true
|
|
logger.LogIf(ctx, err)
|
|
stopFn(err)
|
|
continue
|
|
}
|
|
|
|
if err = z.rebalanceObject(ctx, bucket, gr); err != nil {
|
|
failure = true
|
|
logger.LogIf(ctx, err)
|
|
stopFn(err)
|
|
continue
|
|
}
|
|
|
|
stopFn(nil)
|
|
failure = false
|
|
break
|
|
}
|
|
if ignore {
|
|
continue
|
|
}
|
|
if failure {
|
|
break // break out on first error
|
|
}
|
|
z.updatePoolStats(poolIdx, bucket, version)
|
|
rebalanced++
|
|
}
|
|
|
|
// if all versions were rebalanced, we can delete the object versions.
|
|
if rebalanced == len(fivs.Versions) {
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceRemoveObject, poolIdx, bucket, entry.name)
|
|
_, err := set.DeleteObject(ctx,
|
|
bucket,
|
|
encodeDirObject(entry.name),
|
|
ObjectOptions{
|
|
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
|
},
|
|
)
|
|
stopFn(err)
|
|
auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
// How to resolve partial results.
|
|
resolver := metadataResolutionParams{
|
|
dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
|
objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
|
bucket: bucket,
|
|
}
|
|
err := listPathRaw(ctx, listPathRawOptions{
|
|
disks: disks,
|
|
bucket: bucket,
|
|
recursive: true,
|
|
forwardTo: "",
|
|
minDisks: len(disks) / 2, // to capture all quorum ratios
|
|
reportNotFound: false,
|
|
agreed: func(entry metaCacheEntry) {
|
|
workers <- struct{}{}
|
|
wg.Add(1)
|
|
go rebalanceEntry(entry)
|
|
},
|
|
partial: func(entries metaCacheEntries, _ []error) {
|
|
entry, ok := entries.resolve(&resolver)
|
|
if ok {
|
|
workers <- struct{}{}
|
|
wg.Add(1)
|
|
go rebalanceEntry(*entry)
|
|
}
|
|
},
|
|
finished: nil,
|
|
})
|
|
logger.LogIf(ctx, err)
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
type rebalSaveOpts uint8
|
|
|
|
const (
|
|
rebalSaveStats rebalSaveOpts = iota
|
|
rebalSaveStoppedAt
|
|
)
|
|
|
|
func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int, opts rebalSaveOpts) error {
|
|
lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
|
|
lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
|
|
return err
|
|
}
|
|
defer lock.Unlock(lkCtx)
|
|
|
|
ctx = lkCtx.Context()
|
|
noLockOpts := ObjectOptions{NoLock: true}
|
|
r := &rebalanceMeta{}
|
|
if err := r.loadWithOpts(ctx, z.serverPools[0], noLockOpts); err != nil {
|
|
return err
|
|
}
|
|
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
switch opts {
|
|
case rebalSaveStoppedAt:
|
|
r.StoppedAt = time.Now()
|
|
case rebalSaveStats:
|
|
r.PoolStats[poolIdx] = z.rebalMeta.PoolStats[poolIdx]
|
|
}
|
|
z.rebalMeta = r
|
|
|
|
err = z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
|
|
return err
|
|
}
|
|
|
|
func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) {
|
|
errStr := ""
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
auditLogInternal(ctx, AuditLogOptions{
|
|
Event: "rebalance",
|
|
APIName: apiName,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
VersionID: versionID,
|
|
Error: errStr,
|
|
})
|
|
}
|
|
|
|
func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
|
|
oi := gr.ObjInfo
|
|
|
|
defer func() {
|
|
gr.Close()
|
|
auditLogRebalance(ctx, "RebalanceCopyData", oi.Bucket, oi.Name, oi.VersionID, err)
|
|
}()
|
|
|
|
actualSize, err := oi.GetActualSize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if oi.isMultipart() {
|
|
res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
|
|
VersionID: oi.VersionID,
|
|
UserDefined: oi.UserDefined,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
|
|
}
|
|
defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{})
|
|
|
|
parts := make([]CompletePart, len(oi.Parts))
|
|
for i, part := range oi.Parts {
|
|
hr, err := hash.NewReader(io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
|
|
}
|
|
pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID,
|
|
part.Number,
|
|
NewPutObjReader(hr),
|
|
ObjectOptions{
|
|
PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
|
|
IndexCB: func() []byte {
|
|
return part.Index // Preserve part Index to ensure decompression works.
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
|
|
}
|
|
parts[i] = CompletePart{
|
|
ETag: pi.ETag,
|
|
PartNumber: pi.PartNumber,
|
|
}
|
|
}
|
|
_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
|
|
MTime: oi.ModTime,
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
hr, err := hash.NewReader(gr, oi.Size, "", "", actualSize)
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
|
|
}
|
|
_, err = z.PutObject(ctx,
|
|
bucket,
|
|
oi.Name,
|
|
NewPutObjReader(hr),
|
|
ObjectOptions{
|
|
VersionID: oi.VersionID,
|
|
MTime: oi.ModTime,
|
|
UserDefined: oi.UserDefined,
|
|
PreserveETag: oi.ETag, // Preserve original ETag to ensure same metadata.
|
|
IndexCB: func() []byte {
|
|
return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
|
|
},
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (z *erasureServerPools) StartRebalance() {
|
|
z.rebalMu.Lock()
|
|
if z.rebalMeta == nil || !z.rebalMeta.StoppedAt.IsZero() { // rebalance not running, nothing to do
|
|
z.rebalMu.Unlock()
|
|
return
|
|
}
|
|
ctx, cancel := context.WithCancel(GlobalContext)
|
|
z.rebalMeta.cancel = cancel // to be used when rebalance-stop is called
|
|
z.rebalMu.Unlock()
|
|
|
|
z.rebalMu.RLock()
|
|
participants := make([]bool, len(z.rebalMeta.PoolStats))
|
|
for i, ps := range z.rebalMeta.PoolStats {
|
|
// skip pools which have completed rebalancing
|
|
if ps.Info.Status != rebalStarted {
|
|
continue
|
|
}
|
|
|
|
participants[i] = ps.Participating
|
|
}
|
|
z.rebalMu.RUnlock()
|
|
|
|
for poolIdx, doRebalance := range participants {
|
|
if !doRebalance {
|
|
continue
|
|
}
|
|
// nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
|
|
if !globalEndpoints[poolIdx].Endpoints[0].IsLocal {
|
|
continue
|
|
}
|
|
|
|
go func(idx int) {
|
|
stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
|
|
err := z.rebalanceBuckets(ctx, idx)
|
|
stopfn(err)
|
|
}(poolIdx)
|
|
}
|
|
}
|
|
|
|
// StopRebalance signals the rebalance goroutine running on this node (if any)
|
|
// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
|
|
func (z *erasureServerPools) StopRebalance() error {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil { // rebalance not running in this node, nothing to do
|
|
return nil
|
|
}
|
|
|
|
if cancel := r.cancel; cancel != nil {
|
|
// cancel != nil only on pool leaders
|
|
r.cancel = nil
|
|
cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// for rebalance trace support
|
|
type rebalanceMetrics struct{}
|
|
|
|
var globalRebalanceMetrics rebalanceMetrics
|
|
|
|
//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
|
|
type rebalanceMetric uint8
|
|
|
|
const (
|
|
rebalanceMetricRebalanceBuckets rebalanceMetric = iota
|
|
rebalanceMetricRebalanceBucket
|
|
rebalanceMetricRebalanceObject
|
|
rebalanceMetricRebalanceRemoveObject
|
|
rebalanceMetricSaveMetadata
|
|
)
|
|
|
|
func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string) madmin.TraceInfo {
|
|
var errStr string
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
return madmin.TraceInfo{
|
|
TraceType: madmin.TraceRebalance,
|
|
Time: startTime,
|
|
NodeName: globalLocalNodeName,
|
|
FuncName: fmt.Sprintf("rebalance.%s (pool-id=%d)", r.String(), poolIdx),
|
|
Duration: duration,
|
|
Path: path,
|
|
Error: errStr,
|
|
}
|
|
}
|
|
|
|
func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(err error) {
|
|
startTime := time.Now()
|
|
return func(err error) {
|
|
duration := time.Since(startTime)
|
|
if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
|
|
globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " ")))
|
|
}
|
|
}
|
|
}
|