minio/cmd/erasure-server-pool-rebalance.go
Anis Eleuch 95bf4a57b6
logging: Add subsystem to log API (#19002)
Create new code paths for multiple subsystems in the code. This will
make maintaing this easier later.

Also introduce bugLogIf() for errors that should not happen in the first
place.
2024-04-04 05:04:40 -07:00

971 lines
26 KiB
Go

// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"strings"
"time"
"github.com/dustin/go-humanize"
"github.com/lithammer/shortuuid/v4"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/hash"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/env"
"github.com/minio/pkg/v2/workers"
)
//go:generate msgp -file $GOFILE -unexported
// rebalanceStats contains per-pool rebalance statistics like number of objects,
// versions and bytes rebalanced out of a pool
type rebalanceStats struct {
InitFreeSpace uint64 `json:"initFreeSpace" msg:"ifs"` // Pool free space at the start of rebalance
InitCapacity uint64 `json:"initCapacity" msg:"ic"` // Pool capacity at the start of rebalance
Buckets []string `json:"buckets" msg:"bus"` // buckets being rebalanced or to be rebalanced
RebalancedBuckets []string `json:"rebalancedBuckets" msg:"rbs"` // buckets rebalanced
Bucket string `json:"bucket" msg:"bu"` // Last rebalanced bucket
Object string `json:"object" msg:"ob"` // Last rebalanced object
NumObjects uint64 `json:"numObjects" msg:"no"` // Number of objects rebalanced
NumVersions uint64 `json:"numVersions" msg:"nv"` // Number of versions rebalanced
Bytes uint64 `json:"bytes" msg:"bs"` // Number of bytes rebalanced
Participating bool `json:"participating" msg:"par"`
Info rebalanceInfo `json:"info" msg:"inf"`
}
func (rs *rebalanceStats) update(bucket string, fi FileInfo) {
if fi.IsLatest {
rs.NumObjects++
}
rs.NumVersions++
onDiskSz := int64(0)
if !fi.Deleted {
onDiskSz = fi.Size * int64(fi.Erasure.DataBlocks+fi.Erasure.ParityBlocks) / int64(fi.Erasure.DataBlocks)
}
rs.Bytes += uint64(onDiskSz)
rs.Bucket = bucket
rs.Object = fi.Name
}
type rstats []*rebalanceStats
//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
type rebalStatus uint8
const (
rebalNone rebalStatus = iota
rebalStarted
rebalCompleted
rebalStopped
rebalFailed
)
type rebalanceInfo struct {
StartTime time.Time `msg:"startTs"` // Time at which rebalance-start was issued
EndTime time.Time `msg:"stopTs"` // Time at which rebalance operation completed or rebalance-stop was called
Status rebalStatus `msg:"status"` // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
}
// rebalanceMeta contains information pertaining to an ongoing rebalance operation.
type rebalanceMeta struct {
cancel context.CancelFunc `msg:"-"` // to be invoked on rebalance-stop
lastRefreshedAt time.Time `msg:"-"`
StoppedAt time.Time `msg:"stopTs"` // Time when rebalance-stop was issued.
ID string `msg:"id"` // ID of the ongoing rebalance operation
PercentFreeGoal float64 `msg:"pf"` // Computed from total free space and capacity at the start of rebalance
PoolStats []*rebalanceStats `msg:"rss"` // Per-pool rebalance stats keyed by pool index
}
var errRebalanceNotStarted = errors.New("rebalance not started")
func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error {
r := &rebalanceMeta{}
err := r.load(ctx, z.serverPools[0])
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil
}
return err
}
z.rebalMu.Lock()
if len(r.PoolStats) == len(z.serverPools) {
z.rebalMeta = r
} else {
z.updateRebalanceStats(ctx)
}
z.rebalMu.Unlock()
return nil
}
// updates rebalance.bin from let's say 2 pool setup in the middle
// of a rebalance, was expanded can cause z.rebalMeta to be outdated
// due to a missing new pool. This function tries to handle this
// scenario, albeit rare it seems to have occurred in the wild.
//
// since we do not explicitly disallow it, but it is okay for them
// expand and then we continue to rebalance.
func (z *erasureServerPools) updateRebalanceStats(ctx context.Context) error {
var ok bool
for i := range z.serverPools {
if z.findIndex(i) == -1 {
// Also ensure to initialize rebalanceStats to indicate
// its a new pool that can receive rebalanced data.
z.rebalMeta.PoolStats = append(z.rebalMeta.PoolStats, &rebalanceStats{})
ok = true
}
}
if ok {
lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
if err != nil {
rebalanceLogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
return err
}
defer lock.Unlock(lkCtx)
ctx = lkCtx.Context()
noLockOpts := ObjectOptions{NoLock: true}
return z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
}
return nil
}
func (z *erasureServerPools) findIndex(index int) int {
for i := 0; i < len(z.rebalMeta.PoolStats); i++ {
if i == index {
return index
}
}
return -1
}
// initRebalanceMeta initializes rebalance metadata for a new rebalance
// operation and saves it in the object store.
func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) {
r := &rebalanceMeta{
ID: shortuuid.New(),
PoolStats: make([]*rebalanceStats, len(z.serverPools)),
}
// Fetch disk capacity and available space.
si := z.StorageInfo(ctx, true)
diskStats := make([]struct {
AvailableSpace uint64
TotalSpace uint64
}, len(z.serverPools))
var totalCap, totalFree uint64
for _, disk := range si.Disks {
// Ignore invalid.
if disk.PoolIndex < 0 || len(diskStats) <= disk.PoolIndex {
// https://github.com/minio/minio/issues/16500
continue
}
totalCap += disk.TotalSpace
totalFree += disk.AvailableSpace
diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
}
r.PercentFreeGoal = float64(totalFree) / float64(totalCap)
now := time.Now()
for idx := range z.serverPools {
r.PoolStats[idx] = &rebalanceStats{
Buckets: make([]string, len(buckets)),
RebalancedBuckets: make([]string, 0, len(buckets)),
InitFreeSpace: diskStats[idx].AvailableSpace,
InitCapacity: diskStats[idx].TotalSpace,
}
copy(r.PoolStats[idx].Buckets, buckets)
if pfi := float64(diskStats[idx].AvailableSpace) / float64(diskStats[idx].TotalSpace); pfi < r.PercentFreeGoal {
r.PoolStats[idx].Participating = true
r.PoolStats[idx].Info = rebalanceInfo{
StartTime: now,
Status: rebalStarted,
}
}
}
err = r.save(ctx, z.serverPools[0])
if err != nil {
return arn, err
}
z.rebalMeta = r
return r.ID, nil
}
func (z *erasureServerPools) updatePoolStats(poolIdx int, bucket string, fi FileInfo) {
z.rebalMu.Lock()
defer z.rebalMu.Unlock()
r := z.rebalMeta
if r == nil {
return
}
r.PoolStats[poolIdx].update(bucket, fi)
}
const (
rebalMetaName = "rebalance.bin"
rebalMetaFmt = 1
rebalMetaVer = 1
)
func (z *erasureServerPools) nextRebalBucket(poolIdx int) (string, bool) {
z.rebalMu.RLock()
defer z.rebalMu.RUnlock()
r := z.rebalMeta
if r == nil {
return "", false
}
ps := r.PoolStats[poolIdx]
if ps == nil {
return "", false
}
if ps.Info.Status == rebalCompleted || !ps.Participating {
return "", false
}
if len(ps.Buckets) == 0 {
return "", false
}
return ps.Buckets[0], true
}
func (z *erasureServerPools) bucketRebalanceDone(bucket string, poolIdx int) {
z.rebalMu.Lock()
defer z.rebalMu.Unlock()
ps := z.rebalMeta.PoolStats[poolIdx]
if ps == nil {
return
}
for i, b := range ps.Buckets {
if b == bucket {
ps.Buckets = append(ps.Buckets[:i], ps.Buckets[i+1:]...)
ps.RebalancedBuckets = append(ps.RebalancedBuckets, bucket)
break
}
}
}
func (r *rebalanceMeta) load(ctx context.Context, store objectIO) error {
return r.loadWithOpts(ctx, store, ObjectOptions{})
}
func (r *rebalanceMeta) loadWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
data, _, err := readConfigWithMetadata(ctx, store, rebalMetaName, opts)
if err != nil {
return err
}
if len(data) == 0 {
return nil
}
if len(data) <= 4 {
return fmt.Errorf("rebalanceMeta: no data")
}
// Read header
switch binary.LittleEndian.Uint16(data[0:2]) {
case rebalMetaFmt:
default:
return fmt.Errorf("rebalanceMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case rebalMetaVer:
default:
return fmt.Errorf("rebalanceMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
// OK, parse data.
if _, err = r.UnmarshalMsg(data[4:]); err != nil {
return err
}
r.lastRefreshedAt = time.Now()
return nil
}
func (r *rebalanceMeta) saveWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
data := make([]byte, 4, r.Msgsize()+4)
// Initialize the header.
binary.LittleEndian.PutUint16(data[0:2], rebalMetaFmt)
binary.LittleEndian.PutUint16(data[2:4], rebalMetaVer)
buf, err := r.MarshalMsg(data)
if err != nil {
return err
}
return saveConfigWithOpts(ctx, store, rebalMetaName, buf, opts)
}
func (r *rebalanceMeta) save(ctx context.Context, store objectIO) error {
return r.saveWithOpts(ctx, store, ObjectOptions{})
}
func (z *erasureServerPools) IsRebalanceStarted() bool {
z.rebalMu.RLock()
defer z.rebalMu.RUnlock()
if r := z.rebalMeta; r != nil {
if r.StoppedAt.IsZero() {
return true
}
}
return false
}
func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
z.rebalMu.RLock()
defer z.rebalMu.RUnlock()
if r := z.rebalMeta; r != nil {
if !r.StoppedAt.IsZero() {
return false
}
ps := z.rebalMeta.PoolStats[poolIndex]
return ps.Participating && ps.Info.Status == rebalStarted
}
return false
}
func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
doneCh := make(chan struct{})
defer xioutil.SafeClose(doneCh)
// Save rebalance.bin periodically.
go func() {
// Update rebalance.bin periodically once every 5-10s, chosen randomly
// to avoid multiple pool leaders herding to update around the same
// time.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randSleepFor := func() time.Duration {
return 5*time.Second + time.Duration(float64(5*time.Second)*r.Float64())
}
timer := time.NewTimer(randSleepFor())
defer timer.Stop()
var rebalDone bool
var traceMsg string
for {
select {
case <-doneCh:
// rebalance completed for poolIdx
now := time.Now()
z.rebalMu.Lock()
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalCompleted
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
z.rebalMu.Unlock()
rebalDone = true
traceMsg = fmt.Sprintf("completed at %s", now)
case <-ctx.Done():
// rebalance stopped for poolIdx
now := time.Now()
z.rebalMu.Lock()
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalStopped
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
z.rebalMeta.cancel = nil // remove the already used context.CancelFunc
z.rebalMu.Unlock()
rebalDone = true
traceMsg = fmt.Sprintf("stopped at %s", now)
case <-timer.C:
traceMsg = fmt.Sprintf("saved at %s", time.Now())
}
stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
err := z.saveRebalanceStats(ctx, poolIdx, rebalSaveStats)
stopFn(err)
rebalanceLogIf(ctx, err)
timer.Reset(randSleepFor())
if rebalDone {
return
}
}
}()
rebalanceLogEvent(ctx, "Pool %d rebalancing is started", poolIdx+1)
for {
select {
case <-ctx.Done():
return
default:
}
bucket, ok := z.nextRebalBucket(poolIdx)
if !ok {
// no more buckets to rebalance or target free_space/capacity reached
break
}
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
err = z.rebalanceBucket(ctx, bucket, poolIdx)
if err != nil {
stopFn(err)
rebalanceLogIf(ctx, err)
return
}
stopFn(nil)
z.bucketRebalanceDone(bucket, poolIdx)
}
rebalanceLogEvent(ctx, "Pool %d rebalancing is done", poolIdx+1)
return err
}
func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool {
z.rebalMu.Lock()
defer z.rebalMu.Unlock()
// check if enough objects have been rebalanced
r := z.rebalMeta
poolStats := r.PoolStats[poolIdx]
if poolStats.Info.Status == rebalCompleted {
return true
}
pfi := float64(poolStats.InitFreeSpace+poolStats.Bytes) / float64(poolStats.InitCapacity)
// Mark pool rebalance as done if within 5% from PercentFreeGoal.
if diff := math.Abs(pfi - r.PercentFreeGoal); diff <= 0.05 {
r.PoolStats[poolIdx].Info.Status = rebalCompleted
r.PoolStats[poolIdx].Info.EndTime = time.Now()
return true
}
return false
}
func (set *erasureObjects) listObjectsToRebalance(ctx context.Context, bucketName string, fn func(entry metaCacheEntry)) error {
disks, _ := set.getOnlineDisksWithHealing(false)
if len(disks) == 0 {
return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints())
}
// However many we ask, versions must exist on ~50%
listingQuorum := (set.setDriveCount + 1) / 2
// How to resolve partial results.
resolver := metadataResolutionParams{
dirQuorum: listingQuorum, // make sure to capture all quorum ratios
objQuorum: listingQuorum, // make sure to capture all quorum ratios
bucket: bucketName,
}
err := listPathRaw(ctx, listPathRawOptions{
disks: disks,
bucket: bucketName,
recursive: true,
forwardTo: "",
minDisks: listingQuorum,
reportNotFound: false,
agreed: fn,
partial: func(entries metaCacheEntries, _ []error) {
entry, ok := entries.resolve(&resolver)
if ok {
fn(*entry)
}
},
finished: nil,
})
return err
}
// rebalanceBucket rebalances objects under bucket in poolIdx pool
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error {
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
vc, _ := globalBucketVersioningSys.Get(bucket)
// Check if the current bucket has a configured lifecycle policy
lc, _ := globalLifecycleSys.Get(bucket)
// Check if bucket is object locked.
lr, _ := globalBucketObjectLockSys.Get(bucket)
rcfg, _ := getReplicationConfig(ctx, bucket)
pool := z.serverPools[poolIdx]
const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
workerSize, err := env.GetInt(envRebalanceWorkers, len(pool.sets))
if err != nil {
rebalanceLogIf(ctx, fmt.Errorf("invalid workers value err: %v, defaulting to %d", err, len(pool.sets)))
workerSize = len(pool.sets)
}
// Each decom worker needs one List() goroutine/worker
// add that many extra workers.
workerSize += len(pool.sets)
wk, err := workers.New(workerSize)
if err != nil {
return err
}
for setIdx, set := range pool.sets {
set := set
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
if lc == nil {
return false
}
versioned := vc != nil && vc.Versioned(object)
objInfo := fi.ToObjectInfo(bucket, object, versioned)
evt := evalActionFromLifecycle(ctx, *lc, lr, rcfg, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_Rebal)
return true
}
return false
}
rebalanceEntry := func(entry metaCacheEntry) {
defer wk.Give()
if entry.isDir() {
return
}
// rebalance on poolIdx has reached its goal
if z.checkIfRebalanceDone(poolIdx) {
return
}
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
return
}
// We need a reversed order for rebalance,
// to create the appropriate stack.
versionsSorter(fivs.Versions).reverse()
var rebalanced, expired int
for _, version := range fivs.Versions {
// Skip transitioned objects for now. TBD
if version.IsRemote() {
continue
}
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle(bucket, version.Name, version) {
expired++
continue
}
// any object with only single DEL marker we don't need
// to rebalance, just skip it, this also includes
// any other versions that have already expired.
remainingVersions := len(fivs.Versions) - expired
if version.Deleted && remainingVersions == 1 {
rebalanced++
continue
}
versionID := version.VersionID
if versionID == "" {
versionID = nullVersionID
}
if version.Deleted {
_, err := z.DeleteObject(ctx,
bucket,
version.Name,
ObjectOptions{
Versioned: true,
VersionID: versionID,
MTime: version.ModTime,
DeleteReplication: version.ReplicationState,
DeleteMarker: true, // make sure we create a delete marker
SkipRebalancing: true, // make sure we skip the decommissioned pool
NoAuditLog: true,
})
var failure bool
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
rebalanceLogIf(ctx, err)
failure = true
}
if !failure {
z.updatePoolStats(poolIdx, bucket, version)
rebalanced++
}
auditLogRebalance(ctx, "Rebalance:DeleteMarker", bucket, version.Name, versionID, err)
continue
}
var failure, ignore bool
for try := 0; try < 3; try++ {
// GetObjectReader.Close is called by rebalanceObject
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID)
gr, err := set.GetObjectNInfo(ctx,
bucket,
encodeDirObject(version.Name),
nil,
http.Header{},
ObjectOptions{
VersionID: versionID,
NoDecryption: true,
NoLock: true,
NoAuditLog: true,
})
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
// object deleted by the application, nothing to do here we move on.
ignore = true
stopFn(nil)
break
}
if err != nil {
failure = true
rebalanceLogIf(ctx, err)
stopFn(err)
continue
}
if err = z.rebalanceObject(ctx, bucket, gr); err != nil {
failure = true
rebalanceLogIf(ctx, err)
stopFn(err)
continue
}
stopFn(nil)
failure = false
break
}
if ignore {
continue
}
if failure {
break // break out on first error
}
z.updatePoolStats(poolIdx, bucket, version)
rebalanced++
}
// if all versions were rebalanced, we can delete the object versions.
if rebalanced == len(fivs.Versions) {
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceRemoveObject, poolIdx, bucket, entry.name)
_, err := set.DeleteObject(ctx,
bucket,
encodeDirObject(entry.name),
ObjectOptions{
DeletePrefix: true, // use prefix delete to delete all versions at once.
DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
NoAuditLog: true,
},
)
stopFn(err)
auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
if err != nil {
rebalanceLogIf(ctx, err)
}
}
}
wk.Take()
go func(setIdx int) {
defer wk.Give()
err := set.listObjectsToRebalance(ctx, bucket,
func(entry metaCacheEntry) {
wk.Take()
go rebalanceEntry(entry)
},
)
if err == nil || errors.Is(err, context.Canceled) {
return
}
setN := humanize.Ordinal(setIdx + 1)
rebalanceLogIf(ctx, fmt.Errorf("listing objects from %s set failed with %v", setN, err), "rebalance-listing-failed"+setN)
}(setIdx)
}
wk.Wait()
return nil
}
type rebalSaveOpts uint8
const (
rebalSaveStats rebalSaveOpts = iota
rebalSaveStoppedAt
)
func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int, opts rebalSaveOpts) error {
lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
if err != nil {
rebalanceLogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
return err
}
defer lock.Unlock(lkCtx)
ctx = lkCtx.Context()
noLockOpts := ObjectOptions{NoLock: true}
r := &rebalanceMeta{}
if err := r.loadWithOpts(ctx, z.serverPools[0], noLockOpts); err != nil {
return err
}
z.rebalMu.Lock()
defer z.rebalMu.Unlock()
switch opts {
case rebalSaveStoppedAt:
r.StoppedAt = time.Now()
case rebalSaveStats:
r.PoolStats[poolIdx] = z.rebalMeta.PoolStats[poolIdx]
}
z.rebalMeta = r
return z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
}
func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) {
errStr := ""
if err != nil {
errStr = err.Error()
}
auditLogInternal(ctx, AuditLogOptions{
Event: "rebalance",
APIName: apiName,
Bucket: bucket,
Object: object,
VersionID: versionID,
Error: errStr,
})
}
func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
oi := gr.ObjInfo
defer func() {
gr.Close()
auditLogRebalance(ctx, "RebalanceCopyData", oi.Bucket, oi.Name, oi.VersionID, err)
}()
actualSize, err := oi.GetActualSize()
if err != nil {
return err
}
if oi.isMultipart() {
res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
VersionID: oi.VersionID,
UserDefined: oi.UserDefined,
NoAuditLog: true,
})
if err != nil {
return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
}
defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{NoAuditLog: true})
parts := make([]CompletePart, len(oi.Parts))
for i, part := range oi.Parts {
hr, err := hash.NewReader(ctx, io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
if err != nil {
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
}
pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID,
part.Number,
NewPutObjReader(hr),
ObjectOptions{
PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
IndexCB: func() []byte {
return part.Index // Preserve part Index to ensure decompression works.
},
NoAuditLog: true,
})
if err != nil {
return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
}
parts[i] = CompletePart{
ETag: pi.ETag,
PartNumber: pi.PartNumber,
}
}
_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
DataMovement: true,
MTime: oi.ModTime,
NoAuditLog: true,
})
if err != nil {
err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
}
return err
}
hr, err := hash.NewReader(ctx, gr, oi.Size, "", "", actualSize)
if err != nil {
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
}
_, err = z.PutObject(ctx,
bucket,
oi.Name,
NewPutObjReader(hr),
ObjectOptions{
DataMovement: true,
VersionID: oi.VersionID,
MTime: oi.ModTime,
UserDefined: oi.UserDefined,
PreserveETag: oi.ETag, // Preserve original ETag to ensure same metadata.
IndexCB: func() []byte {
return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
},
NoAuditLog: true,
})
if err != nil {
err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
}
return err
}
func (z *erasureServerPools) StartRebalance() {
z.rebalMu.Lock()
if z.rebalMeta == nil || !z.rebalMeta.StoppedAt.IsZero() { // rebalance not running, nothing to do
z.rebalMu.Unlock()
return
}
ctx, cancel := context.WithCancel(GlobalContext)
z.rebalMeta.cancel = cancel // to be used when rebalance-stop is called
z.rebalMu.Unlock()
z.rebalMu.RLock()
participants := make([]bool, len(z.rebalMeta.PoolStats))
for i, ps := range z.rebalMeta.PoolStats {
// skip pools which have completed rebalancing
if ps.Info.Status != rebalStarted {
continue
}
participants[i] = ps.Participating
}
z.rebalMu.RUnlock()
for poolIdx, doRebalance := range participants {
if !doRebalance {
continue
}
// nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
if !globalEndpoints[poolIdx].Endpoints[0].IsLocal {
continue
}
go func(idx int) {
stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
err := z.rebalanceBuckets(ctx, idx)
stopfn(err)
}(poolIdx)
}
}
// StopRebalance signals the rebalance goroutine running on this node (if any)
// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
func (z *erasureServerPools) StopRebalance() error {
z.rebalMu.Lock()
defer z.rebalMu.Unlock()
r := z.rebalMeta
if r == nil { // rebalance not running in this node, nothing to do
return nil
}
if cancel := r.cancel; cancel != nil {
// cancel != nil only on pool leaders
r.cancel = nil
cancel()
}
return nil
}
// for rebalance trace support
type rebalanceMetrics struct{}
var globalRebalanceMetrics rebalanceMetrics
//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
type rebalanceMetric uint8
const (
rebalanceMetricRebalanceBuckets rebalanceMetric = iota
rebalanceMetricRebalanceBucket
rebalanceMetricRebalanceObject
rebalanceMetricRebalanceRemoveObject
rebalanceMetricSaveMetadata
)
func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string) madmin.TraceInfo {
var errStr string
if err != nil {
errStr = err.Error()
}
return madmin.TraceInfo{
TraceType: madmin.TraceRebalance,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: fmt.Sprintf("rebalance.%s (pool-id=%d)", r.String(), poolIdx),
Duration: duration,
Path: path,
Error: errStr,
}
}
func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(err error) {
startTime := time.Now()
return func(err error) {
duration := time.Since(startTime)
if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " ")))
}
}
}