mirror of
https://github.com/minio/minio.git
synced 2025-01-16 01:03:15 -05:00
64e803b136
rebalance metadata is good to have only, if it cannot be loaded when starting MinIO for some reason we can possibly ignore it and move on and let user start rebalance again if needed.
1031 lines
28 KiB
Go
1031 lines
28 KiB
Go
// Copyright (c) 2015-2022 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/lithammer/shortuuid/v4"
|
|
"github.com/minio/madmin-go/v3"
|
|
"github.com/minio/minio/internal/bucket/lifecycle"
|
|
objectlock "github.com/minio/minio/internal/bucket/object/lock"
|
|
"github.com/minio/minio/internal/bucket/replication"
|
|
"github.com/minio/minio/internal/bucket/versioning"
|
|
"github.com/minio/minio/internal/hash"
|
|
xioutil "github.com/minio/minio/internal/ioutil"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/v3/env"
|
|
"github.com/minio/pkg/v3/workers"
|
|
)
|
|
|
|
//go:generate msgp -file $GOFILE -unexported
|
|
|
|
// rebalanceStats contains per-pool rebalance statistics like number of objects,
|
|
// versions and bytes rebalanced out of a pool
|
|
type rebalanceStats struct {
|
|
InitFreeSpace uint64 `json:"initFreeSpace" msg:"ifs"` // Pool free space at the start of rebalance
|
|
InitCapacity uint64 `json:"initCapacity" msg:"ic"` // Pool capacity at the start of rebalance
|
|
|
|
Buckets []string `json:"buckets" msg:"bus"` // buckets being rebalanced or to be rebalanced
|
|
RebalancedBuckets []string `json:"rebalancedBuckets" msg:"rbs"` // buckets rebalanced
|
|
Bucket string `json:"bucket" msg:"bu"` // Last rebalanced bucket
|
|
Object string `json:"object" msg:"ob"` // Last rebalanced object
|
|
NumObjects uint64 `json:"numObjects" msg:"no"` // Number of objects rebalanced
|
|
NumVersions uint64 `json:"numVersions" msg:"nv"` // Number of versions rebalanced
|
|
Bytes uint64 `json:"bytes" msg:"bs"` // Number of bytes rebalanced
|
|
Participating bool `json:"participating" msg:"par"`
|
|
Info rebalanceInfo `json:"info" msg:"inf"`
|
|
}
|
|
|
|
func (rs *rebalanceStats) update(bucket string, fi FileInfo) {
|
|
if fi.IsLatest {
|
|
rs.NumObjects++
|
|
}
|
|
|
|
rs.NumVersions++
|
|
onDiskSz := int64(0)
|
|
if !fi.Deleted {
|
|
onDiskSz = fi.Size * int64(fi.Erasure.DataBlocks+fi.Erasure.ParityBlocks) / int64(fi.Erasure.DataBlocks)
|
|
}
|
|
rs.Bytes += uint64(onDiskSz)
|
|
rs.Bucket = bucket
|
|
rs.Object = fi.Name
|
|
}
|
|
|
|
type rstats []*rebalanceStats
|
|
|
|
//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
|
|
type rebalStatus uint8
|
|
|
|
const (
|
|
rebalNone rebalStatus = iota
|
|
rebalStarted
|
|
rebalCompleted
|
|
rebalStopped
|
|
rebalFailed
|
|
)
|
|
|
|
type rebalanceInfo struct {
|
|
StartTime time.Time `msg:"startTs"` // Time at which rebalance-start was issued
|
|
EndTime time.Time `msg:"stopTs"` // Time at which rebalance operation completed or rebalance-stop was called
|
|
Status rebalStatus `msg:"status"` // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
|
|
}
|
|
|
|
// rebalanceMeta contains information pertaining to an ongoing rebalance operation.
|
|
type rebalanceMeta struct {
|
|
cancel context.CancelFunc `msg:"-"` // to be invoked on rebalance-stop
|
|
lastRefreshedAt time.Time `msg:"-"`
|
|
StoppedAt time.Time `msg:"stopTs"` // Time when rebalance-stop was issued.
|
|
ID string `msg:"id"` // ID of the ongoing rebalance operation
|
|
PercentFreeGoal float64 `msg:"pf"` // Computed from total free space and capacity at the start of rebalance
|
|
PoolStats []*rebalanceStats `msg:"rss"` // Per-pool rebalance stats keyed by pool index
|
|
}
|
|
|
|
var errRebalanceNotStarted = errors.New("rebalance not started")
|
|
|
|
func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error {
|
|
r := &rebalanceMeta{}
|
|
if err := r.load(ctx, z.serverPools[0]); err == nil {
|
|
z.rebalMu.Lock()
|
|
z.rebalMeta = r
|
|
z.updateRebalanceStats(ctx)
|
|
z.rebalMu.Unlock()
|
|
} else if !errors.Is(err, errConfigNotFound) {
|
|
rebalanceLogIf(ctx, fmt.Errorf("failed to load rebalance metadata, continue to restart rebalance as needed: %w", err))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updates rebalance.bin from let's say 2 pool setup in the middle
|
|
// of a rebalance, was expanded can cause z.rebalMeta to be outdated
|
|
// due to a missing new pool. This function tries to handle this
|
|
// scenario, albeit rare it seems to have occurred in the wild.
|
|
//
|
|
// since we do not explicitly disallow it, but it is okay for them
|
|
// expand and then we continue to rebalance.
|
|
func (z *erasureServerPools) updateRebalanceStats(ctx context.Context) error {
|
|
var ok bool
|
|
for i := range z.serverPools {
|
|
if z.findIndex(i) == -1 {
|
|
// Also ensure to initialize rebalanceStats to indicate
|
|
// its a new pool that can receive rebalanced data.
|
|
z.rebalMeta.PoolStats = append(z.rebalMeta.PoolStats, &rebalanceStats{})
|
|
ok = true
|
|
}
|
|
}
|
|
if ok {
|
|
return z.rebalMeta.save(ctx, z.serverPools[0])
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (z *erasureServerPools) findIndex(index int) int {
|
|
if z.rebalMeta == nil {
|
|
return 0
|
|
}
|
|
for i := 0; i < len(z.rebalMeta.PoolStats); i++ {
|
|
if i == index {
|
|
return index
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// initRebalanceMeta initializes rebalance metadata for a new rebalance
|
|
// operation and saves it in the object store.
|
|
func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) {
|
|
r := &rebalanceMeta{
|
|
ID: shortuuid.New(),
|
|
PoolStats: make([]*rebalanceStats, len(z.serverPools)),
|
|
}
|
|
|
|
// Fetch disk capacity and available space.
|
|
si := z.StorageInfo(ctx, true)
|
|
diskStats := make([]struct {
|
|
AvailableSpace uint64
|
|
TotalSpace uint64
|
|
}, len(z.serverPools))
|
|
var totalCap, totalFree uint64
|
|
for _, disk := range si.Disks {
|
|
// Ignore invalid.
|
|
if disk.PoolIndex < 0 || len(diskStats) <= disk.PoolIndex {
|
|
// https://github.com/minio/minio/issues/16500
|
|
continue
|
|
}
|
|
totalCap += disk.TotalSpace
|
|
totalFree += disk.AvailableSpace
|
|
|
|
diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
|
|
diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
|
|
}
|
|
r.PercentFreeGoal = float64(totalFree) / float64(totalCap)
|
|
|
|
now := time.Now()
|
|
for idx := range z.serverPools {
|
|
r.PoolStats[idx] = &rebalanceStats{
|
|
Buckets: make([]string, len(buckets)),
|
|
RebalancedBuckets: make([]string, 0, len(buckets)),
|
|
InitFreeSpace: diskStats[idx].AvailableSpace,
|
|
InitCapacity: diskStats[idx].TotalSpace,
|
|
}
|
|
copy(r.PoolStats[idx].Buckets, buckets)
|
|
|
|
if pfi := float64(diskStats[idx].AvailableSpace) / float64(diskStats[idx].TotalSpace); pfi < r.PercentFreeGoal {
|
|
r.PoolStats[idx].Participating = true
|
|
r.PoolStats[idx].Info = rebalanceInfo{
|
|
StartTime: now,
|
|
Status: rebalStarted,
|
|
}
|
|
}
|
|
}
|
|
|
|
err = r.save(ctx, z.serverPools[0])
|
|
if err != nil {
|
|
return arn, err
|
|
}
|
|
|
|
z.rebalMeta = r
|
|
return r.ID, nil
|
|
}
|
|
|
|
func (z *erasureServerPools) updatePoolStats(poolIdx int, bucket string, fi FileInfo) {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.PoolStats[poolIdx].update(bucket, fi)
|
|
}
|
|
|
|
const (
|
|
rebalMetaName = "rebalance.bin"
|
|
rebalMetaFmt = 1
|
|
rebalMetaVer = 1
|
|
)
|
|
|
|
func (z *erasureServerPools) nextRebalBucket(poolIdx int) (string, bool) {
|
|
z.rebalMu.RLock()
|
|
defer z.rebalMu.RUnlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil {
|
|
return "", false
|
|
}
|
|
|
|
ps := r.PoolStats[poolIdx]
|
|
if ps == nil {
|
|
return "", false
|
|
}
|
|
|
|
if ps.Info.Status == rebalCompleted || !ps.Participating {
|
|
return "", false
|
|
}
|
|
|
|
if len(ps.Buckets) == 0 {
|
|
return "", false
|
|
}
|
|
|
|
return ps.Buckets[0], true
|
|
}
|
|
|
|
func (z *erasureServerPools) bucketRebalanceDone(bucket string, poolIdx int) {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
if z.rebalMeta == nil {
|
|
return
|
|
}
|
|
|
|
ps := z.rebalMeta.PoolStats[poolIdx]
|
|
if ps == nil {
|
|
return
|
|
}
|
|
|
|
for i, b := range ps.Buckets {
|
|
if b == bucket {
|
|
ps.Buckets = append(ps.Buckets[:i], ps.Buckets[i+1:]...)
|
|
ps.RebalancedBuckets = append(ps.RebalancedBuckets, bucket)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *rebalanceMeta) load(ctx context.Context, store objectIO) error {
|
|
return r.loadWithOpts(ctx, store, ObjectOptions{})
|
|
}
|
|
|
|
func (r *rebalanceMeta) loadWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
|
|
data, _, err := readConfigWithMetadata(ctx, store, rebalMetaName, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(data) == 0 {
|
|
return nil
|
|
}
|
|
if len(data) <= 4 {
|
|
return fmt.Errorf("rebalanceMeta: no data")
|
|
}
|
|
|
|
// Read header
|
|
switch binary.LittleEndian.Uint16(data[0:2]) {
|
|
case rebalMetaFmt:
|
|
default:
|
|
return fmt.Errorf("rebalanceMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
|
|
}
|
|
switch binary.LittleEndian.Uint16(data[2:4]) {
|
|
case rebalMetaVer:
|
|
default:
|
|
return fmt.Errorf("rebalanceMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
|
|
}
|
|
|
|
// OK, parse data.
|
|
if _, err = r.UnmarshalMsg(data[4:]); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.lastRefreshedAt = time.Now()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *rebalanceMeta) saveWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
|
|
if r == nil {
|
|
return nil
|
|
}
|
|
|
|
data := make([]byte, 4, r.Msgsize()+4)
|
|
|
|
// Initialize the header.
|
|
binary.LittleEndian.PutUint16(data[0:2], rebalMetaFmt)
|
|
binary.LittleEndian.PutUint16(data[2:4], rebalMetaVer)
|
|
|
|
buf, err := r.MarshalMsg(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return saveConfigWithOpts(ctx, store, rebalMetaName, buf, opts)
|
|
}
|
|
|
|
func (r *rebalanceMeta) save(ctx context.Context, store objectIO) error {
|
|
return r.saveWithOpts(ctx, store, ObjectOptions{})
|
|
}
|
|
|
|
func (z *erasureServerPools) IsRebalanceStarted() bool {
|
|
z.rebalMu.RLock()
|
|
defer z.rebalMu.RUnlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil {
|
|
return false
|
|
}
|
|
if !r.StoppedAt.IsZero() {
|
|
return false
|
|
}
|
|
for _, ps := range r.PoolStats {
|
|
if ps.Participating && ps.Info.Status != rebalCompleted {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
|
|
z.rebalMu.RLock()
|
|
defer z.rebalMu.RUnlock()
|
|
|
|
if r := z.rebalMeta; r != nil {
|
|
if !r.StoppedAt.IsZero() {
|
|
return false
|
|
}
|
|
ps := r.PoolStats[poolIndex]
|
|
return ps.Participating && ps.Info.Status == rebalStarted
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
|
|
doneCh := make(chan error, 1)
|
|
defer xioutil.SafeClose(doneCh)
|
|
|
|
// Save rebalance.bin periodically.
|
|
go func() {
|
|
// Update rebalance.bin periodically once every 5-10s, chosen randomly
|
|
// to avoid multiple pool leaders herding to update around the same
|
|
// time.
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
randSleepFor := func() time.Duration {
|
|
return 5*time.Second + time.Duration(float64(5*time.Second)*r.Float64())
|
|
}
|
|
|
|
timer := time.NewTimer(randSleepFor())
|
|
defer timer.Stop()
|
|
|
|
var (
|
|
quit bool
|
|
traceMsg string
|
|
)
|
|
|
|
for {
|
|
select {
|
|
case rebalErr := <-doneCh:
|
|
quit = true
|
|
now := time.Now()
|
|
var status rebalStatus
|
|
|
|
switch {
|
|
case errors.Is(rebalErr, context.Canceled):
|
|
status = rebalStopped
|
|
traceMsg = fmt.Sprintf("stopped at %s", now)
|
|
case rebalErr == nil:
|
|
status = rebalCompleted
|
|
traceMsg = fmt.Sprintf("completed at %s", now)
|
|
default:
|
|
status = rebalFailed
|
|
traceMsg = fmt.Sprintf("stopped at %s with err: %v", now, rebalErr)
|
|
}
|
|
|
|
z.rebalMu.Lock()
|
|
z.rebalMeta.PoolStats[poolIdx].Info.Status = status
|
|
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
|
|
z.rebalMu.Unlock()
|
|
|
|
case <-timer.C:
|
|
traceMsg = fmt.Sprintf("saved at %s", time.Now())
|
|
}
|
|
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
|
|
err := z.saveRebalanceStats(GlobalContext, poolIdx, rebalSaveStats)
|
|
stopFn(0, err)
|
|
rebalanceLogIf(GlobalContext, err)
|
|
|
|
if quit {
|
|
return
|
|
}
|
|
|
|
timer.Reset(randSleepFor())
|
|
}
|
|
}()
|
|
|
|
rebalanceLogEvent(ctx, "Pool %d rebalancing is started", poolIdx+1)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
doneCh <- ctx.Err()
|
|
return
|
|
default:
|
|
}
|
|
|
|
bucket, ok := z.nextRebalBucket(poolIdx)
|
|
if !ok {
|
|
// no more buckets to rebalance or target free_space/capacity reached
|
|
break
|
|
}
|
|
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
|
|
if err = z.rebalanceBucket(ctx, bucket, poolIdx); err != nil {
|
|
stopFn(0, err)
|
|
if errors.Is(err, errServerNotInitialized) || errors.Is(err, errBucketMetadataNotInitialized) {
|
|
continue
|
|
}
|
|
rebalanceLogIf(GlobalContext, err)
|
|
doneCh <- err
|
|
return
|
|
}
|
|
stopFn(0, nil)
|
|
z.bucketRebalanceDone(bucket, poolIdx)
|
|
}
|
|
|
|
rebalanceLogEvent(GlobalContext, "Pool %d rebalancing is done", poolIdx+1)
|
|
|
|
return err
|
|
}
|
|
|
|
func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
// check if enough objects have been rebalanced
|
|
r := z.rebalMeta
|
|
poolStats := r.PoolStats[poolIdx]
|
|
if poolStats.Info.Status == rebalCompleted {
|
|
return true
|
|
}
|
|
|
|
pfi := float64(poolStats.InitFreeSpace+poolStats.Bytes) / float64(poolStats.InitCapacity)
|
|
// Mark pool rebalance as done if within 5% from PercentFreeGoal.
|
|
if diff := math.Abs(pfi - r.PercentFreeGoal); diff <= 0.05 {
|
|
r.PoolStats[poolIdx].Info.Status = rebalCompleted
|
|
r.PoolStats[poolIdx].Info.EndTime = time.Now()
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (set *erasureObjects) listObjectsToRebalance(ctx context.Context, bucketName string, fn func(entry metaCacheEntry)) error {
|
|
disks, _ := set.getOnlineDisksWithHealing(false)
|
|
if len(disks) == 0 {
|
|
return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints())
|
|
}
|
|
|
|
// However many we ask, versions must exist on ~50%
|
|
listingQuorum := (set.setDriveCount + 1) / 2
|
|
|
|
// How to resolve partial results.
|
|
resolver := metadataResolutionParams{
|
|
dirQuorum: listingQuorum, // make sure to capture all quorum ratios
|
|
objQuorum: listingQuorum, // make sure to capture all quorum ratios
|
|
bucket: bucketName,
|
|
}
|
|
|
|
err := listPathRaw(ctx, listPathRawOptions{
|
|
disks: disks,
|
|
bucket: bucketName,
|
|
recursive: true,
|
|
forwardTo: "",
|
|
minDisks: listingQuorum,
|
|
reportNotFound: false,
|
|
agreed: fn,
|
|
partial: func(entries metaCacheEntries, _ []error) {
|
|
entry, ok := entries.resolve(&resolver)
|
|
if ok {
|
|
fn(*entry)
|
|
}
|
|
},
|
|
finished: nil,
|
|
})
|
|
return err
|
|
}
|
|
|
|
// rebalanceBucket rebalances objects under bucket in poolIdx pool
|
|
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) (err error) {
|
|
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
|
|
|
var vc *versioning.Versioning
|
|
var lc *lifecycle.Lifecycle
|
|
var lr objectlock.Retention
|
|
var rcfg *replication.Config
|
|
if bucket != minioMetaBucket {
|
|
vc, err = globalBucketVersioningSys.Get(bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check if the current bucket has a configured lifecycle policy
|
|
lc, err = globalLifecycleSys.Get(bucket)
|
|
if err != nil && !errors.Is(err, BucketLifecycleNotFound{Bucket: bucket}) {
|
|
return err
|
|
}
|
|
|
|
// Check if bucket is object locked.
|
|
lr, err = globalBucketObjectLockSys.Get(bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rcfg, err = getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
pool := z.serverPools[poolIdx]
|
|
|
|
const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
|
|
workerSize, err := env.GetInt(envRebalanceWorkers, len(pool.sets))
|
|
if err != nil {
|
|
rebalanceLogIf(ctx, fmt.Errorf("invalid workers value err: %v, defaulting to %d", err, len(pool.sets)))
|
|
workerSize = len(pool.sets)
|
|
}
|
|
|
|
// Each decom worker needs one List() goroutine/worker
|
|
// add that many extra workers.
|
|
workerSize += len(pool.sets)
|
|
|
|
wk, err := workers.New(workerSize)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for setIdx, set := range pool.sets {
|
|
set := set
|
|
|
|
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
|
|
if lc == nil {
|
|
return false
|
|
}
|
|
versioned := vc != nil && vc.Versioned(object)
|
|
objInfo := fi.ToObjectInfo(bucket, object, versioned)
|
|
|
|
evt := evalActionFromLifecycle(ctx, *lc, lr, rcfg, objInfo)
|
|
if evt.Action.Delete() {
|
|
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_Rebal)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
rebalanceEntry := func(entry metaCacheEntry) {
|
|
defer wk.Give()
|
|
|
|
if entry.isDir() {
|
|
return
|
|
}
|
|
|
|
// rebalance on poolIdx has reached its goal
|
|
if z.checkIfRebalanceDone(poolIdx) {
|
|
return
|
|
}
|
|
|
|
fivs, err := entry.fileInfoVersions(bucket)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// We need a reversed order for rebalance,
|
|
// to create the appropriate stack.
|
|
versionsSorter(fivs.Versions).reverse()
|
|
|
|
var rebalanced, expired int
|
|
for _, version := range fivs.Versions {
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID)
|
|
|
|
// Skip transitioned objects for now. TBD
|
|
if version.IsRemote() {
|
|
stopFn(version.Size, errors.New("ILM Tiered version will be skipped for now"))
|
|
continue
|
|
}
|
|
|
|
// Apply lifecycle rules on the objects that are expired.
|
|
if filterLifecycle(bucket, version.Name, version) {
|
|
expired++
|
|
stopFn(version.Size, errors.New("ILM expired object/version will be skipped"))
|
|
continue
|
|
}
|
|
|
|
// any object with only single DEL marker we don't need
|
|
// to rebalance, just skip it, this also includes
|
|
// any other versions that have already expired.
|
|
remainingVersions := len(fivs.Versions) - expired
|
|
if version.Deleted && remainingVersions == 1 {
|
|
rebalanced++
|
|
stopFn(version.Size, errors.New("DELETE marked object with no other non-current versions will be skipped"))
|
|
continue
|
|
}
|
|
|
|
versionID := version.VersionID
|
|
if versionID == "" {
|
|
versionID = nullVersionID
|
|
}
|
|
|
|
var failure, ignore bool
|
|
if version.Deleted {
|
|
_, err := z.DeleteObject(ctx,
|
|
bucket,
|
|
version.Name,
|
|
ObjectOptions{
|
|
Versioned: true,
|
|
VersionID: versionID,
|
|
MTime: version.ModTime,
|
|
DeleteReplication: version.ReplicationState,
|
|
SrcPoolIdx: poolIdx,
|
|
DataMovement: true,
|
|
DeleteMarker: true, // make sure we create a delete marker
|
|
SkipRebalancing: true, // make sure we skip the decommissioned pool
|
|
NoAuditLog: true,
|
|
})
|
|
// This can happen when rebalance stop races with ongoing rebalance workers.
|
|
// These rebalance failures can be ignored.
|
|
if err != nil {
|
|
// This can happen when rebalance stop races with ongoing rebalance workers.
|
|
// These rebalance failures can be ignored.
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) {
|
|
ignore = true
|
|
stopFn(0, nil)
|
|
continue
|
|
}
|
|
}
|
|
stopFn(version.Size, err)
|
|
rebalanceLogIf(ctx, err)
|
|
failure = err != nil
|
|
if !failure {
|
|
z.updatePoolStats(poolIdx, bucket, version)
|
|
rebalanced++
|
|
}
|
|
auditLogRebalance(ctx, "Rebalance:DeleteMarker", bucket, version.Name, versionID, err)
|
|
continue
|
|
}
|
|
|
|
for try := 0; try < 3; try++ {
|
|
// GetObjectReader.Close is called by rebalanceObject
|
|
gr, err := set.GetObjectNInfo(ctx,
|
|
bucket,
|
|
encodeDirObject(version.Name),
|
|
nil,
|
|
http.Header{},
|
|
ObjectOptions{
|
|
VersionID: versionID,
|
|
NoDecryption: true,
|
|
NoLock: true,
|
|
NoAuditLog: true,
|
|
})
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
// object deleted by the application, nothing to do here we move on.
|
|
ignore = true
|
|
stopFn(0, nil)
|
|
break
|
|
}
|
|
if err != nil {
|
|
failure = true
|
|
rebalanceLogIf(ctx, err)
|
|
stopFn(0, err)
|
|
continue
|
|
}
|
|
|
|
if err = z.rebalanceObject(ctx, poolIdx, bucket, gr); err != nil {
|
|
// This can happen when rebalance stop races with ongoing rebalance workers.
|
|
// These rebalance failures can be ignored.
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) {
|
|
ignore = true
|
|
stopFn(0, nil)
|
|
break
|
|
}
|
|
failure = true
|
|
rebalanceLogIf(ctx, err)
|
|
stopFn(version.Size, err)
|
|
continue
|
|
}
|
|
|
|
stopFn(version.Size, nil)
|
|
failure = false
|
|
break
|
|
}
|
|
if ignore {
|
|
continue
|
|
}
|
|
if failure {
|
|
break // break out on first error
|
|
}
|
|
z.updatePoolStats(poolIdx, bucket, version)
|
|
rebalanced++
|
|
}
|
|
|
|
// if all versions were rebalanced, we can delete the object versions.
|
|
if rebalanced == len(fivs.Versions) {
|
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceRemoveObject, poolIdx, bucket, entry.name)
|
|
_, err := set.DeleteObject(ctx,
|
|
bucket,
|
|
encodeDirObject(entry.name),
|
|
ObjectOptions{
|
|
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
|
DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
|
|
NoAuditLog: true,
|
|
},
|
|
)
|
|
stopFn(0, err)
|
|
auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
|
|
if err != nil {
|
|
rebalanceLogIf(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
wk.Take()
|
|
go func(setIdx int) {
|
|
defer wk.Give()
|
|
err := set.listObjectsToRebalance(ctx, bucket,
|
|
func(entry metaCacheEntry) {
|
|
wk.Take()
|
|
go rebalanceEntry(entry)
|
|
},
|
|
)
|
|
if err == nil || errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
setN := humanize.Ordinal(setIdx + 1)
|
|
rebalanceLogIf(ctx, fmt.Errorf("listing objects from %s set failed with %v", setN, err), "rebalance-listing-failed"+setN)
|
|
}(setIdx)
|
|
}
|
|
|
|
wk.Wait()
|
|
return nil
|
|
}
|
|
|
|
type rebalSaveOpts uint8
|
|
|
|
const (
|
|
rebalSaveStats rebalSaveOpts = iota
|
|
rebalSaveStoppedAt
|
|
)
|
|
|
|
func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int, opts rebalSaveOpts) error {
|
|
lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
|
|
lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
rebalanceLogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
|
|
return err
|
|
}
|
|
defer lock.Unlock(lkCtx)
|
|
|
|
ctx = lkCtx.Context()
|
|
noLockOpts := ObjectOptions{NoLock: true}
|
|
r := &rebalanceMeta{}
|
|
if err := r.loadWithOpts(ctx, z.serverPools[0], noLockOpts); err != nil {
|
|
return err
|
|
}
|
|
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
switch opts {
|
|
case rebalSaveStoppedAt:
|
|
r.StoppedAt = time.Now()
|
|
case rebalSaveStats:
|
|
if z.rebalMeta != nil {
|
|
r.PoolStats[poolIdx] = z.rebalMeta.PoolStats[poolIdx]
|
|
}
|
|
}
|
|
z.rebalMeta = r
|
|
|
|
return z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
|
|
}
|
|
|
|
func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) {
|
|
errStr := ""
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
auditLogInternal(ctx, AuditLogOptions{
|
|
Event: "rebalance",
|
|
APIName: apiName,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
VersionID: versionID,
|
|
Error: errStr,
|
|
})
|
|
}
|
|
|
|
func (z *erasureServerPools) rebalanceObject(ctx context.Context, poolIdx int, bucket string, gr *GetObjectReader) (err error) {
|
|
oi := gr.ObjInfo
|
|
|
|
defer func() {
|
|
gr.Close()
|
|
auditLogRebalance(ctx, "RebalanceCopyData", oi.Bucket, oi.Name, oi.VersionID, err)
|
|
}()
|
|
|
|
actualSize, err := oi.GetActualSize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if oi.isMultipart() {
|
|
res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
|
|
VersionID: oi.VersionID,
|
|
UserDefined: oi.UserDefined,
|
|
NoAuditLog: true,
|
|
DataMovement: true,
|
|
SrcPoolIdx: poolIdx,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
|
|
}
|
|
defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{NoAuditLog: true})
|
|
|
|
parts := make([]CompletePart, len(oi.Parts))
|
|
for i, part := range oi.Parts {
|
|
hr, err := hash.NewReader(ctx, io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
|
|
}
|
|
pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID,
|
|
part.Number,
|
|
NewPutObjReader(hr),
|
|
ObjectOptions{
|
|
PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
|
|
IndexCB: func() []byte {
|
|
return part.Index // Preserve part Index to ensure decompression works.
|
|
},
|
|
NoAuditLog: true,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
|
|
}
|
|
parts[i] = CompletePart{
|
|
ETag: pi.ETag,
|
|
PartNumber: pi.PartNumber,
|
|
}
|
|
}
|
|
_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
|
|
DataMovement: true,
|
|
MTime: oi.ModTime,
|
|
NoAuditLog: true,
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
hr, err := hash.NewReader(ctx, gr, oi.Size, "", "", actualSize)
|
|
if err != nil {
|
|
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
|
|
}
|
|
|
|
_, err = z.PutObject(ctx,
|
|
bucket,
|
|
oi.Name,
|
|
NewPutObjReader(hr),
|
|
ObjectOptions{
|
|
SrcPoolIdx: poolIdx,
|
|
DataMovement: true,
|
|
VersionID: oi.VersionID,
|
|
MTime: oi.ModTime,
|
|
UserDefined: oi.UserDefined,
|
|
PreserveETag: oi.ETag, // Preserve original ETag to ensure same metadata.
|
|
IndexCB: func() []byte {
|
|
return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
|
|
},
|
|
NoAuditLog: true,
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (z *erasureServerPools) StartRebalance() {
|
|
z.rebalMu.Lock()
|
|
if z.rebalMeta == nil || !z.rebalMeta.StoppedAt.IsZero() { // rebalance not running, nothing to do
|
|
z.rebalMu.Unlock()
|
|
return
|
|
}
|
|
ctx, cancel := context.WithCancel(GlobalContext)
|
|
z.rebalMeta.cancel = cancel // to be used when rebalance-stop is called
|
|
z.rebalMu.Unlock()
|
|
|
|
z.rebalMu.RLock()
|
|
participants := make([]bool, len(z.rebalMeta.PoolStats))
|
|
for i, ps := range z.rebalMeta.PoolStats {
|
|
// skip pools which have completed rebalancing
|
|
if ps.Info.Status != rebalStarted {
|
|
continue
|
|
}
|
|
|
|
participants[i] = ps.Participating
|
|
}
|
|
z.rebalMu.RUnlock()
|
|
|
|
for poolIdx, doRebalance := range participants {
|
|
if !doRebalance {
|
|
continue
|
|
}
|
|
// nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
|
|
if !globalEndpoints[poolIdx].Endpoints[0].IsLocal {
|
|
continue
|
|
}
|
|
|
|
go func(idx int) {
|
|
stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
|
|
err := z.rebalanceBuckets(ctx, idx)
|
|
stopfn(0, err)
|
|
}(poolIdx)
|
|
}
|
|
}
|
|
|
|
// StopRebalance signals the rebalance goroutine running on this node (if any)
|
|
// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
|
|
func (z *erasureServerPools) StopRebalance() error {
|
|
z.rebalMu.Lock()
|
|
defer z.rebalMu.Unlock()
|
|
|
|
r := z.rebalMeta
|
|
if r == nil { // rebalance not running in this node, nothing to do
|
|
return nil
|
|
}
|
|
|
|
if cancel := r.cancel; cancel != nil {
|
|
// cancel != nil only on pool leaders
|
|
r.cancel = nil
|
|
cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// for rebalance trace support
|
|
type rebalanceMetrics struct{}
|
|
|
|
var globalRebalanceMetrics rebalanceMetrics
|
|
|
|
//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
|
|
type rebalanceMetric uint8
|
|
|
|
const (
|
|
rebalanceMetricRebalanceBuckets rebalanceMetric = iota
|
|
rebalanceMetricRebalanceBucket
|
|
rebalanceMetricRebalanceObject
|
|
rebalanceMetricRebalanceRemoveObject
|
|
rebalanceMetricSaveMetadata
|
|
)
|
|
|
|
var errDataMovementSrcDstPoolSame = errors.New("source and destination pool are the same")
|
|
|
|
func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string, sz int64) madmin.TraceInfo {
|
|
var errStr string
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
return madmin.TraceInfo{
|
|
TraceType: madmin.TraceRebalance,
|
|
Time: startTime,
|
|
NodeName: globalLocalNodeName,
|
|
FuncName: fmt.Sprintf("rebalance.%s (pool-id=%d)", r.String(), poolIdx),
|
|
Duration: duration,
|
|
Path: path,
|
|
Error: errStr,
|
|
Bytes: sz,
|
|
}
|
|
}
|
|
|
|
func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(sz int64, err error) {
|
|
startTime := time.Now()
|
|
return func(sz int64, err error) {
|
|
duration := time.Since(startTime)
|
|
if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
|
|
globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " "), sz))
|
|
}
|
|
}
|
|
}
|