// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"io"
	"math"
	"math/rand"
	"net/http"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/lithammer/shortuuid/v4"
	"github.com/minio/madmin-go/v3"
	"github.com/minio/minio/internal/hash"
	"github.com/minio/minio/internal/logger"
	"github.com/minio/pkg/v2/env"
)

//go:generate msgp -file $GOFILE -unexported

// rebalanceStats contains per-pool rebalance statistics like number of objects,
// versions and bytes rebalanced out of a pool
type rebalanceStats struct {
	InitFreeSpace uint64 `json:"initFreeSpace" msg:"ifs"` // Pool free space at the start of rebalance
	InitCapacity  uint64 `json:"initCapacity" msg:"ic"`   // Pool capacity at the start of rebalance

	Buckets           []string      `json:"buckets" msg:"bus"`           // buckets being rebalanced or to be rebalanced
	RebalancedBuckets []string      `json:"rebalancedBuckets" msg:"rbs"` // buckets rebalanced
	Bucket            string        `json:"bucket" msg:"bu"`             // Last rebalanced bucket
	Object            string        `json:"object" msg:"ob"`             // Last rebalanced object
	NumObjects        uint64        `json:"numObjects" msg:"no"`         // Number of objects rebalanced
	NumVersions       uint64        `json:"numVersions" msg:"nv"`        // Number of versions rebalanced
	Bytes             uint64        `json:"bytes" msg:"bs"`              // Number of bytes rebalanced
	Participating     bool          `json:"participating" msg:"par"`
	Info              rebalanceInfo `json:"info" msg:"inf"`
}

func (rs *rebalanceStats) update(bucket string, fi FileInfo) {
	if fi.IsLatest {
		rs.NumObjects++
	}

	rs.NumVersions++
	onDiskSz := int64(0)
	if !fi.Deleted {
		onDiskSz = fi.Size * int64(fi.Erasure.DataBlocks+fi.Erasure.ParityBlocks) / int64(fi.Erasure.DataBlocks)
	}
	rs.Bytes += uint64(onDiskSz)
	rs.Bucket = bucket
	rs.Object = fi.Name
}

type rstats []*rebalanceStats

//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
type rebalStatus uint8

const (
	rebalNone rebalStatus = iota
	rebalStarted
	rebalCompleted
	rebalStopped
	rebalFailed
)

type rebalanceInfo struct {
	StartTime time.Time   `msg:"startTs"` // Time at which rebalance-start was issued
	EndTime   time.Time   `msg:"stopTs"`  // Time at which rebalance operation completed or rebalance-stop was called
	Status    rebalStatus `msg:"status"`  // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
}

// rebalanceMeta contains information pertaining to an ongoing rebalance operation.
type rebalanceMeta struct {
	cancel          context.CancelFunc `msg:"-"` // to be invoked on rebalance-stop
	lastRefreshedAt time.Time          `msg:"-"`
	StoppedAt       time.Time          `msg:"stopTs"` // Time when rebalance-stop was issued.
	ID              string             `msg:"id"`     // ID of the ongoing rebalance operation
	PercentFreeGoal float64            `msg:"pf"`     // Computed from total free space and capacity at the start of rebalance
	PoolStats       []*rebalanceStats  `msg:"rss"`    // Per-pool rebalance stats keyed by pool index
}

var errRebalanceNotStarted = errors.New("rebalance not started")

func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error {
	r := &rebalanceMeta{}
	err := r.load(ctx, z.serverPools[0])
	if err != nil {
		if errors.Is(err, errConfigNotFound) {
			return nil
		}
		return err
	}

	z.rebalMu.Lock()
	z.rebalMeta = r
	z.rebalMu.Unlock()

	return nil
}

// initRebalanceMeta initializes rebalance metadata for a new rebalance
// operation and saves it in the object store.
func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) {
	r := &rebalanceMeta{
		ID:        shortuuid.New(),
		PoolStats: make([]*rebalanceStats, len(z.serverPools)),
	}

	// Fetch disk capacity and available space.
	si := z.StorageInfo(ctx)
	diskStats := make([]struct {
		AvailableSpace uint64
		TotalSpace     uint64
	}, len(z.serverPools))
	var totalCap, totalFree uint64
	for _, disk := range si.Disks {
		// Ignore invalid.
		if disk.PoolIndex < 0 || len(diskStats) <= disk.PoolIndex {
			// https://github.com/minio/minio/issues/16500
			continue
		}
		totalCap += disk.TotalSpace
		totalFree += disk.AvailableSpace

		diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
		diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
	}
	r.PercentFreeGoal = float64(totalFree) / float64(totalCap)

	now := time.Now()
	for idx := range z.serverPools {
		r.PoolStats[idx] = &rebalanceStats{
			Buckets:           make([]string, len(buckets)),
			RebalancedBuckets: make([]string, 0, len(buckets)),
			InitFreeSpace:     diskStats[idx].AvailableSpace,
			InitCapacity:      diskStats[idx].TotalSpace,
		}
		copy(r.PoolStats[idx].Buckets, buckets)

		if pfi := float64(diskStats[idx].AvailableSpace) / float64(diskStats[idx].TotalSpace); pfi < r.PercentFreeGoal {
			r.PoolStats[idx].Participating = true
			r.PoolStats[idx].Info = rebalanceInfo{
				StartTime: now,
				Status:    rebalStarted,
			}
		}
	}

	err = r.save(ctx, z.serverPools[0])
	if err != nil {
		return arn, err
	}

	z.rebalMeta = r
	return r.ID, nil
}

func (z *erasureServerPools) updatePoolStats(poolIdx int, bucket string, fi FileInfo) {
	z.rebalMu.Lock()
	defer z.rebalMu.Unlock()

	r := z.rebalMeta
	if r == nil {
		return
	}

	r.PoolStats[poolIdx].update(bucket, fi)
}

const (
	rebalMetaName = "rebalance.bin"
	rebalMetaFmt  = 1
	rebalMetaVer  = 1
)

func (z *erasureServerPools) nextRebalBucket(poolIdx int) (string, bool) {
	z.rebalMu.RLock()
	defer z.rebalMu.RUnlock()

	r := z.rebalMeta
	if r == nil {
		return "", false
	}

	ps := r.PoolStats[poolIdx]
	if ps == nil {
		return "", false
	}

	if ps.Info.Status == rebalCompleted || !ps.Participating {
		return "", false
	}

	if len(ps.Buckets) == 0 {
		return "", false
	}

	return ps.Buckets[0], true
}

func (z *erasureServerPools) bucketRebalanceDone(bucket string, poolIdx int) {
	z.rebalMu.Lock()
	defer z.rebalMu.Unlock()

	ps := z.rebalMeta.PoolStats[poolIdx]
	if ps == nil {
		return
	}

	for i, b := range ps.Buckets {
		if b == bucket {
			ps.Buckets = append(ps.Buckets[:i], ps.Buckets[i+1:]...)
			ps.RebalancedBuckets = append(ps.RebalancedBuckets, bucket)
			break
		}
	}
}

func (r *rebalanceMeta) load(ctx context.Context, store objectIO) error {
	return r.loadWithOpts(ctx, store, ObjectOptions{})
}

func (r *rebalanceMeta) loadWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
	data, _, err := readConfigWithMetadata(ctx, store, rebalMetaName, opts)
	if err != nil {
		return err
	}

	if len(data) == 0 {
		return nil
	}
	if len(data) <= 4 {
		return fmt.Errorf("rebalanceMeta: no data")
	}

	// Read header
	switch binary.LittleEndian.Uint16(data[0:2]) {
	case rebalMetaFmt:
	default:
		return fmt.Errorf("rebalanceMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
	}
	switch binary.LittleEndian.Uint16(data[2:4]) {
	case rebalMetaVer:
	default:
		return fmt.Errorf("rebalanceMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
	}

	// OK, parse data.
	if _, err = r.UnmarshalMsg(data[4:]); err != nil {
		return err
	}

	r.lastRefreshedAt = time.Now()

	return nil
}

func (r *rebalanceMeta) saveWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
	data := make([]byte, 4, r.Msgsize()+4)

	// Initialize the header.
	binary.LittleEndian.PutUint16(data[0:2], rebalMetaFmt)
	binary.LittleEndian.PutUint16(data[2:4], rebalMetaVer)

	buf, err := r.MarshalMsg(data)
	if err != nil {
		return err
	}

	return saveConfigWithOpts(ctx, store, rebalMetaName, buf, opts)
}

func (r *rebalanceMeta) save(ctx context.Context, store objectIO) error {
	return r.saveWithOpts(ctx, store, ObjectOptions{})
}

func (z *erasureServerPools) IsRebalanceStarted() bool {
	z.rebalMu.RLock()
	defer z.rebalMu.RUnlock()

	if r := z.rebalMeta; r != nil {
		if r.StoppedAt.IsZero() {
			return true
		}
	}
	return false
}

func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
	z.rebalMu.RLock()
	defer z.rebalMu.RUnlock()

	if r := z.rebalMeta; r != nil {
		if !r.StoppedAt.IsZero() {
			return false
		}
		ps := z.rebalMeta.PoolStats[poolIndex]
		return ps.Participating && ps.Info.Status == rebalStarted
	}
	return false
}

func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
	doneCh := make(chan struct{})
	defer close(doneCh)

	// Save rebalance.bin periodically.
	go func() {
		// Update rebalance.bin periodically once every 5-10s, chosen randomly
		// to avoid multiple pool leaders herding to update around the same
		// time.
		r := rand.New(rand.NewSource(time.Now().UnixNano()))
		randSleepFor := func() time.Duration {
			return 5*time.Second + time.Duration(float64(5*time.Second)*r.Float64())
		}

		timer := time.NewTimer(randSleepFor())
		defer timer.Stop()
		var rebalDone bool
		var traceMsg string

		for {
			select {
			case <-doneCh:
				// rebalance completed for poolIdx
				now := time.Now()
				z.rebalMu.Lock()
				z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalCompleted
				z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
				z.rebalMu.Unlock()

				rebalDone = true
				traceMsg = fmt.Sprintf("completed at %s", now)

			case <-ctx.Done():

				// rebalance stopped for poolIdx
				now := time.Now()
				z.rebalMu.Lock()
				z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalStopped
				z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
				z.rebalMeta.cancel = nil // remove the already used context.CancelFunc
				z.rebalMu.Unlock()

				rebalDone = true
				traceMsg = fmt.Sprintf("stopped at %s", now)

			case <-timer.C:
				traceMsg = fmt.Sprintf("saved at %s", time.Now())
			}

			stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
			err := z.saveRebalanceStats(ctx, poolIdx, rebalSaveStats)
			stopFn(err)
			logger.LogIf(ctx, err)
			timer.Reset(randSleepFor())

			if rebalDone {
				return
			}
		}
	}()

	for {
		select {
		case <-ctx.Done():
			return
		default:
		}

		bucket, ok := z.nextRebalBucket(poolIdx)
		if !ok {
			// no more buckets to rebalance or target free_space/capacity reached
			break
		}

		stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
		err = z.rebalanceBucket(ctx, bucket, poolIdx)
		if err != nil {
			stopFn(err)
			logger.LogIf(ctx, err)
			return
		}
		stopFn(nil)
		z.bucketRebalanceDone(bucket, poolIdx)
	}

	return err
}

func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool {
	z.rebalMu.Lock()
	defer z.rebalMu.Unlock()

	// check if enough objects have been rebalanced
	r := z.rebalMeta
	poolStats := r.PoolStats[poolIdx]
	if poolStats.Info.Status == rebalCompleted {
		return true
	}

	pfi := float64(poolStats.InitFreeSpace+poolStats.Bytes) / float64(poolStats.InitCapacity)
	// Mark pool rebalance as done if within 5% from PercentFreeGoal.
	if diff := math.Abs(pfi - r.PercentFreeGoal); diff <= 0.05 {
		r.PoolStats[poolIdx].Info.Status = rebalCompleted
		r.PoolStats[poolIdx].Info.EndTime = time.Now()
		return true
	}

	return false
}

// rebalanceBucket rebalances objects under bucket in poolIdx pool
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error {
	ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
	vc, _ := globalBucketVersioningSys.Get(bucket)
	// Check if the current bucket has a configured lifecycle policy
	lc, _ := globalLifecycleSys.Get(bucket)
	// Check if bucket is object locked.
	lr, _ := globalBucketObjectLockSys.Get(bucket)

	pool := z.serverPools[poolIdx]
	const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
	wStr := env.Get(envRebalanceWorkers, strconv.Itoa(len(pool.sets)))
	workerSize, err := strconv.Atoi(wStr)
	if err != nil {
		logger.LogIf(ctx, fmt.Errorf("invalid %s value: %s err: %v, defaulting to %d", envRebalanceWorkers, wStr, err, len(pool.sets)))
		workerSize = len(pool.sets)
	}
	workers := make(chan struct{}, workerSize)
	var wg sync.WaitGroup
	for _, set := range pool.sets {
		set := set
		disks := set.getOnlineDisks()
		if len(disks) == 0 {
			logger.LogIf(ctx, fmt.Errorf("no online disks found for set with endpoints %s",
				set.getEndpoints()))
			continue
		}

		filterLifecycle := func(bucket, object string, fi FileInfo) bool {
			if lc == nil {
				return false
			}
			versioned := vc != nil && vc.Versioned(object)
			objInfo := fi.ToObjectInfo(bucket, object, versioned)

			evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
			if evt.Action.Delete() {
				globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_Rebal)
				return true
			}

			return false
		}

		rebalanceEntry := func(entry metaCacheEntry) {
			defer func() {
				<-workers
				wg.Done()
			}()

			if entry.isDir() {
				return
			}

			// rebalance on poolIdx has reached its goal
			if z.checkIfRebalanceDone(poolIdx) {
				return
			}

			fivs, err := entry.fileInfoVersions(bucket)
			if err != nil {
				return
			}

			// We need a reversed order for rebalance,
			// to create the appropriate stack.
			versionsSorter(fivs.Versions).reverse()

			var rebalanced, expired int
			for _, version := range fivs.Versions {
				// Skip transitioned objects for now. TBD
				if version.IsRemote() {
					continue
				}

				// Apply lifecycle rules on the objects that are expired.
				if filterLifecycle(bucket, version.Name, version) {
					rebalanced++
					expired++
					continue
				}

				// any object with only single DEL marker we don't need
				// to rebalance, just skip it, this also includes
				// any other versions that have already expired.
				remainingVersions := len(fivs.Versions) - expired
				if version.Deleted && remainingVersions == 1 {
					rebalanced++
					continue
				}

				versionID := version.VersionID
				if versionID == "" {
					versionID = nullVersionID
				}

				if version.Deleted {
					_, err := z.DeleteObject(ctx,
						bucket,
						version.Name,
						ObjectOptions{
							Versioned:         true,
							VersionID:         versionID,
							MTime:             version.ModTime,
							DeleteReplication: version.ReplicationState,
							DeleteMarker:      true, // make sure we create a delete marker
							SkipRebalancing:   true, // make sure we skip the decommissioned pool
						})
					var failure bool
					if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
						logger.LogIf(ctx, err)
						failure = true
					}

					if !failure {
						z.updatePoolStats(poolIdx, bucket, version)
						rebalanced++
					}
					continue
				}

				var failure, ignore bool
				for try := 0; try < 3; try++ {
					// GetObjectReader.Close is called by rebalanceObject
					stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID)
					gr, err := set.GetObjectNInfo(ctx,
						bucket,
						encodeDirObject(version.Name),
						nil,
						http.Header{},
						ObjectOptions{
							VersionID:    versionID,
							NoDecryption: true,
							NoLock:       true,
						})
					if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
						// object deleted by the application, nothing to do here we move on.
						ignore = true
						stopFn(nil)
						break
					}
					if err != nil {
						failure = true
						logger.LogIf(ctx, err)
						stopFn(err)
						continue
					}

					if err = z.rebalanceObject(ctx, bucket, gr); err != nil {
						failure = true
						logger.LogIf(ctx, err)
						stopFn(err)
						continue
					}

					stopFn(nil)
					failure = false
					break
				}
				if ignore {
					continue
				}
				if failure {
					break // break out on first error
				}
				z.updatePoolStats(poolIdx, bucket, version)
				rebalanced++
			}

			// if all versions were rebalanced, we can delete the object versions.
			if rebalanced == len(fivs.Versions) {
				stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceRemoveObject, poolIdx, bucket, entry.name)
				_, err := set.DeleteObject(ctx,
					bucket,
					encodeDirObject(entry.name),
					ObjectOptions{
						DeletePrefix: true, // use prefix delete to delete all versions at once.
					},
				)
				stopFn(err)
				auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
				if err != nil {
					logger.LogIf(ctx, err)
				}
			}
		}

		wg.Add(1)
		go func() {
			defer wg.Done()

			// How to resolve partial results.
			resolver := metadataResolutionParams{
				dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
				objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
				bucket:    bucket,
			}
			err := listPathRaw(ctx, listPathRawOptions{
				disks:          disks,
				bucket:         bucket,
				recursive:      true,
				forwardTo:      "",
				minDisks:       len(disks) / 2, // to capture all quorum ratios
				reportNotFound: false,
				agreed: func(entry metaCacheEntry) {
					workers <- struct{}{}
					wg.Add(1)
					go rebalanceEntry(entry)
				},
				partial: func(entries metaCacheEntries, _ []error) {
					entry, ok := entries.resolve(&resolver)
					if ok {
						workers <- struct{}{}
						wg.Add(1)
						go rebalanceEntry(*entry)
					}
				},
				finished: nil,
			})
			logger.LogIf(ctx, err)
		}()
	}
	wg.Wait()
	return nil
}

type rebalSaveOpts uint8

const (
	rebalSaveStats rebalSaveOpts = iota
	rebalSaveStoppedAt
)

func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int, opts rebalSaveOpts) error {
	lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
	lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
	if err != nil {
		logger.LogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
		return err
	}
	defer lock.Unlock(lkCtx)

	ctx = lkCtx.Context()
	noLockOpts := ObjectOptions{NoLock: true}
	r := &rebalanceMeta{}
	if err := r.loadWithOpts(ctx, z.serverPools[0], noLockOpts); err != nil {
		return err
	}

	z.rebalMu.Lock()
	defer z.rebalMu.Unlock()

	switch opts {
	case rebalSaveStoppedAt:
		r.StoppedAt = time.Now()
	case rebalSaveStats:
		r.PoolStats[poolIdx] = z.rebalMeta.PoolStats[poolIdx]
	}
	z.rebalMeta = r

	err = z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
	return err
}

func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) {
	errStr := ""
	if err != nil {
		errStr = err.Error()
	}
	auditLogInternal(ctx, AuditLogOptions{
		Event:     "rebalance",
		APIName:   apiName,
		Bucket:    bucket,
		Object:    object,
		VersionID: versionID,
		Error:     errStr,
	})
}

func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
	oi := gr.ObjInfo

	defer func() {
		gr.Close()
		auditLogRebalance(ctx, "RebalanceCopyData", oi.Bucket, oi.Name, oi.VersionID, err)
	}()

	actualSize, err := oi.GetActualSize()
	if err != nil {
		return err
	}

	if oi.isMultipart() {
		res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
			VersionID:   oi.VersionID,
			UserDefined: oi.UserDefined,
		})
		if err != nil {
			return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
		}
		defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{})

		parts := make([]CompletePart, len(oi.Parts))
		for i, part := range oi.Parts {
			hr, err := hash.NewReader(io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
			if err != nil {
				return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
			}
			pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID,
				part.Number,
				NewPutObjReader(hr),
				ObjectOptions{
					PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
					IndexCB: func() []byte {
						return part.Index // Preserve part Index to ensure decompression works.
					},
				})
			if err != nil {
				return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
			}
			parts[i] = CompletePart{
				ETag:       pi.ETag,
				PartNumber: pi.PartNumber,
			}
		}
		_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
			MTime: oi.ModTime,
		})
		if err != nil {
			err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
		}
		return err
	}

	hr, err := hash.NewReader(gr, oi.Size, "", "", actualSize)
	if err != nil {
		return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
	}
	_, err = z.PutObject(ctx,
		bucket,
		oi.Name,
		NewPutObjReader(hr),
		ObjectOptions{
			VersionID:    oi.VersionID,
			MTime:        oi.ModTime,
			UserDefined:  oi.UserDefined,
			PreserveETag: oi.ETag, // Preserve original ETag to ensure same metadata.
			IndexCB: func() []byte {
				return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
			},
		})
	if err != nil {
		err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
	}
	return err
}

func (z *erasureServerPools) StartRebalance() {
	z.rebalMu.Lock()
	if z.rebalMeta == nil || !z.rebalMeta.StoppedAt.IsZero() { // rebalance not running, nothing to do
		z.rebalMu.Unlock()
		return
	}
	ctx, cancel := context.WithCancel(GlobalContext)
	z.rebalMeta.cancel = cancel // to be used when rebalance-stop is called
	z.rebalMu.Unlock()

	z.rebalMu.RLock()
	participants := make([]bool, len(z.rebalMeta.PoolStats))
	for i, ps := range z.rebalMeta.PoolStats {
		// skip pools which have completed rebalancing
		if ps.Info.Status != rebalStarted {
			continue
		}

		participants[i] = ps.Participating
	}
	z.rebalMu.RUnlock()

	for poolIdx, doRebalance := range participants {
		if !doRebalance {
			continue
		}
		// nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
		if !globalEndpoints[poolIdx].Endpoints[0].IsLocal {
			continue
		}

		go func(idx int) {
			stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
			err := z.rebalanceBuckets(ctx, idx)
			stopfn(err)
		}(poolIdx)
	}
}

// StopRebalance signals the rebalance goroutine running on this node (if any)
// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
func (z *erasureServerPools) StopRebalance() error {
	z.rebalMu.Lock()
	defer z.rebalMu.Unlock()

	r := z.rebalMeta
	if r == nil { // rebalance not running in this node, nothing to do
		return nil
	}

	if cancel := r.cancel; cancel != nil {
		// cancel != nil only on pool leaders
		r.cancel = nil
		cancel()
	}
	return nil
}

// for rebalance trace support
type rebalanceMetrics struct{}

var globalRebalanceMetrics rebalanceMetrics

//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
type rebalanceMetric uint8

const (
	rebalanceMetricRebalanceBuckets rebalanceMetric = iota
	rebalanceMetricRebalanceBucket
	rebalanceMetricRebalanceObject
	rebalanceMetricRebalanceRemoveObject
	rebalanceMetricSaveMetadata
)

func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string) madmin.TraceInfo {
	var errStr string
	if err != nil {
		errStr = err.Error()
	}
	return madmin.TraceInfo{
		TraceType: madmin.TraceRebalance,
		Time:      startTime,
		NodeName:  globalLocalNodeName,
		FuncName:  fmt.Sprintf("rebalance.%s (pool-id=%d)", r.String(), poolIdx),
		Duration:  duration,
		Path:      path,
		Error:     errStr,
	}
}

func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(err error) {
	startTime := time.Now()
	return func(err error) {
		duration := time.Since(startTime)
		if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
			globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " ")))
		}
	}
}