mirror of https://github.com/minio/minio.git
feat: introduce pool-level rebalance (#15483)
This commit is contained in:
parent
ce8456a1a9
commit
4523da6543
|
@ -124,6 +124,18 @@ func toAdminAPIErr(ctx context.Context, err error) APIError {
|
|||
Description: err.Error(),
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}
|
||||
case errors.Is(err, errDecommissionRebalanceAlreadyRunning):
|
||||
apiErr = APIError{
|
||||
Code: "XMinioDecommissionNotAllowed",
|
||||
Description: err.Error(),
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}
|
||||
case errors.Is(err, errRebalanceDecommissionAlreadyRunning):
|
||||
apiErr = APIError{
|
||||
Code: "XMinioRebalanceNotAllowed",
|
||||
Description: err.Error(),
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}
|
||||
case errors.Is(err, errConfigNotFound):
|
||||
apiErr = APIError{
|
||||
Code: "XMinioConfigError",
|
||||
|
|
|
@ -19,6 +19,7 @@ package cmd
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
|
@ -27,6 +28,11 @@ import (
|
|||
iampolicy "github.com/minio/pkg/iam/policy"
|
||||
)
|
||||
|
||||
var (
|
||||
errRebalanceDecommissionAlreadyRunning = errors.New("Rebalance cannot be started, decommission is aleady in progress")
|
||||
errDecommissionRebalanceAlreadyRunning = errors.New("Decommission cannot be started, rebalance is already in progress")
|
||||
)
|
||||
|
||||
func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "StartDecommission")
|
||||
|
||||
|
@ -49,6 +55,11 @@ func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Reque
|
|||
return
|
||||
}
|
||||
|
||||
if pools.IsRebalanceStarted() {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errDecommissionRebalanceAlreadyRunning), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
v := vars["pool"]
|
||||
|
||||
|
@ -200,3 +211,137 @@ func (a adminAPIHandlers) ListPools(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
logger.LogIf(r.Context(), json.NewEncoder(w).Encode(poolsStatus))
|
||||
}
|
||||
|
||||
func (a adminAPIHandlers) RebalanceStart(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "RebalanceStart")
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.RebalanceAdminAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// NB rebalance-start admin API is always coordinated from first pool's
|
||||
// first node. The following is required to serialize (the effects of)
|
||||
// concurrent rebalance-start commands.
|
||||
if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal {
|
||||
for nodeIdx, proxyEp := range globalProxyEndpoints {
|
||||
if proxyEp.Endpoint.Host == ep.Host {
|
||||
if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pools, ok := objectAPI.(*erasureServerPools)
|
||||
if !ok || len(pools.serverPools) == 1 {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if pools.IsDecommissionRunning() {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errRebalanceDecommissionAlreadyRunning), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if pools.IsRebalanceStarted() {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminRebalanceAlreadyStarted), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
bucketInfos, err := objectAPI.ListBuckets(ctx, BucketOptions{})
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
buckets := make([]string, 0, len(bucketInfos))
|
||||
for _, bInfo := range bucketInfos {
|
||||
buckets = append(buckets, bInfo.Name)
|
||||
}
|
||||
|
||||
var id string
|
||||
if id, err = pools.initRebalanceMeta(ctx, buckets); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Rebalance routine is run on the first node of any pool participating in rebalance.
|
||||
pools.StartRebalance()
|
||||
|
||||
b, err := json.Marshal(struct {
|
||||
ID string `json:"id"`
|
||||
}{ID: id})
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccessResponseJSON(w, b)
|
||||
// Notify peers to load rebalance.bin and start rebalance routine if they happen to be
|
||||
// participating pool's leader node
|
||||
globalNotificationSys.LoadRebalanceMeta(ctx, true)
|
||||
}
|
||||
|
||||
func (a adminAPIHandlers) RebalanceStatus(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "RebalanceStatus")
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.RebalanceAdminAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Proxy rebalance-status to first pool first node, so that users see a
|
||||
// consistent view of rebalance progress even though different rebalancing
|
||||
// pools may temporarily have out of date info on the others.
|
||||
if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal {
|
||||
for nodeIdx, proxyEp := range globalProxyEndpoints {
|
||||
if proxyEp.Endpoint.Host == ep.Host {
|
||||
if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pools, ok := objectAPI.(*erasureServerPools)
|
||||
if !ok {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
rs, err := rebalanceStatus(ctx, pools)
|
||||
if err != nil {
|
||||
if errors.Is(err, errRebalanceNotStarted) {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminRebalanceNotStarted), r.URL)
|
||||
return
|
||||
}
|
||||
logger.LogIf(ctx, fmt.Errorf("failed to fetch rebalance status: %w", err))
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
logger.LogIf(r.Context(), json.NewEncoder(w).Encode(rs))
|
||||
}
|
||||
|
||||
func (a adminAPIHandlers) RebalanceStop(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "RebalanceStop")
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.RebalanceAdminAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
pools, ok := objectAPI.(*erasureServerPools)
|
||||
if !ok {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Cancel any ongoing rebalance operation
|
||||
globalNotificationSys.StopRebalance(r.Context())
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
logger.LogIf(ctx, pools.saveRebalanceStats(GlobalContext, 0, rebalSaveStoppedAt))
|
||||
}
|
||||
|
|
|
@ -84,6 +84,11 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
|
|||
|
||||
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/decommission").HandlerFunc(gz(httpTraceAll(adminAPI.StartDecommission))).Queries("pool", "{pool:.*}")
|
||||
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/cancel").HandlerFunc(gz(httpTraceAll(adminAPI.CancelDecommission))).Queries("pool", "{pool:.*}")
|
||||
|
||||
// Rebalance operations
|
||||
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/rebalance/start").HandlerFunc(gz(httpTraceAll(adminAPI.RebalanceStart)))
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/rebalance/status").HandlerFunc(gz(httpTraceAll(adminAPI.RebalanceStatus)))
|
||||
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/rebalance/stop").HandlerFunc(gz(httpTraceAll(adminAPI.RebalanceStop)))
|
||||
}
|
||||
|
||||
// Profiling operations - deprecated API
|
||||
|
|
|
@ -291,6 +291,10 @@ const (
|
|||
ErrSiteReplicationIAMError
|
||||
ErrSiteReplicationConfigMissing
|
||||
|
||||
// Pool rebalance errors
|
||||
ErrAdminRebalanceAlreadyStarted
|
||||
ErrAdminRebalanceNotStarted
|
||||
|
||||
// Bucket Quota error codes
|
||||
ErrAdminBucketQuotaExceeded
|
||||
ErrAdminNoSuchQuotaConfiguration
|
||||
|
@ -1397,6 +1401,16 @@ var errorCodes = errorCodeMap{
|
|||
Description: "Site not found in site replication configuration",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrAdminRebalanceAlreadyStarted: {
|
||||
Code: "XMinioAdminRebalanceAlreadyStarted",
|
||||
Description: "Pool rebalance is already started",
|
||||
HTTPStatusCode: http.StatusConflict,
|
||||
},
|
||||
ErrAdminRebalanceNotStarted: {
|
||||
Code: "XMinioAdminRebalanceNotStarted",
|
||||
Description: "Pool rebalance is not started",
|
||||
HTTPStatusCode: http.StatusNotFound,
|
||||
},
|
||||
ErrMaximumExpires: {
|
||||
Code: "AuthorizationQueryParametersError",
|
||||
Description: "X-Amz-Expires must be less than a week (in seconds); that is, the given X-Amz-Expires must be less than 604800 seconds",
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -29,10 +29,14 @@ import (
|
|||
|
||||
var errConfigNotFound = errors.New("config file not found")
|
||||
|
||||
func readConfigWithMetadata(ctx context.Context, store objectIO, configFile string) ([]byte, ObjectInfo, error) {
|
||||
r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
func readConfigWithMetadata(ctx context.Context, store objectIO, configFile string, opts ObjectOptions) ([]byte, ObjectInfo, error) {
|
||||
lockType := readLock
|
||||
if opts.NoLock {
|
||||
lockType = noLock // erasureObjects.GetObjectNInfo honors lockType argument but not opts.NoLock.
|
||||
}
|
||||
|
||||
r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, lockType, opts)
|
||||
if err != nil {
|
||||
// Treat object not found as config not found.
|
||||
if isErrObjectNotFound(err) {
|
||||
return nil, ObjectInfo{}, errConfigNotFound
|
||||
}
|
||||
|
@ -52,7 +56,7 @@ func readConfigWithMetadata(ctx context.Context, store objectIO, configFile stri
|
|||
}
|
||||
|
||||
func readConfig(ctx context.Context, store objectIO, configFile string) ([]byte, error) {
|
||||
buf, _, err := readConfigWithMetadata(ctx, store, configFile)
|
||||
buf, _, err := readConfigWithMetadata(ctx, store, configFile, ObjectOptions{})
|
||||
return buf, err
|
||||
}
|
||||
|
||||
|
@ -70,16 +74,20 @@ func deleteConfig(ctx context.Context, objAPI objectDeleter, configFile string)
|
|||
return err
|
||||
}
|
||||
|
||||
func saveConfig(ctx context.Context, store objectIO, configFile string, data []byte) error {
|
||||
func saveConfigWithOpts(ctx context.Context, store objectIO, configFile string, data []byte, opts ObjectOptions) error {
|
||||
hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = store.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), ObjectOptions{MaxParity: true})
|
||||
_, err = store.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), opts)
|
||||
return err
|
||||
}
|
||||
|
||||
func saveConfig(ctx context.Context, store objectIO, configFile string, data []byte) error {
|
||||
return saveConfigWithOpts(ctx, store, configFile, data, ObjectOptions{MaxParity: true})
|
||||
}
|
||||
|
||||
func checkConfig(ctx context.Context, objAPI ObjectLayer, configFile string) error {
|
||||
if _, err := objAPI.GetObjectInfo(ctx, minioMetaBucket, configFile, ObjectOptions{}); err != nil {
|
||||
// Treat object not found as config not found.
|
||||
|
|
|
@ -499,6 +499,15 @@ const (
|
|||
// Init() initializes pools and saves additional information about them
|
||||
// in 'pool.bin', this is eventually used for decommissioning the pool.
|
||||
func (z *erasureServerPools) Init(ctx context.Context) error {
|
||||
// Load rebalance metadata if present
|
||||
err := z.loadRebalanceMeta(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load rebalance data: %w", err)
|
||||
}
|
||||
|
||||
// Start rebalance routine
|
||||
z.StartRebalance()
|
||||
|
||||
meta := poolMeta{}
|
||||
|
||||
if err := meta.load(ctx, z.serverPools[0], z.serverPools); err != nil {
|
||||
|
@ -573,6 +582,19 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) IsDecommissionRunning() bool {
|
||||
z.poolMetaMutex.RLock()
|
||||
defer z.poolMetaMutex.RUnlock()
|
||||
meta := z.poolMeta
|
||||
for _, pool := range meta.Pools {
|
||||
if pool.Decommission != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
|
||||
objInfo := gr.ObjInfo
|
||||
|
||||
|
|
|
@ -0,0 +1,876 @@
|
|||
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lithammer/shortuuid/v4"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/env"
|
||||
)
|
||||
|
||||
//go:generate msgp -file $GOFILE -unexported
|
||||
|
||||
// rebalanceStats contains per-pool rebalance statistics like number of objects,
|
||||
// versions and bytes rebalanced out of a pool
|
||||
type rebalanceStats struct {
|
||||
InitFreeSpace uint64 `json:"initFreeSpace" msg:"ifs"` // Pool free space at the start of rebalance
|
||||
InitCapacity uint64 `json:"initCapacity" msg:"ic"` // Pool capacity at the start of rebalance
|
||||
|
||||
Buckets []string `json:"buckets" msg:"bus"` // buckets being rebalanced or to be rebalanced
|
||||
RebalancedBuckets []string `json:"rebalancedBuckets" msg:"rbs"` // buckets rebalanced
|
||||
Bucket string `json:"bucket" msg:"bu"` // Last rebalanced bucket
|
||||
Object string `json:"object" msg:"ob"` // Last rebalanced object
|
||||
NumObjects uint64 `json:"numObjects" msg:"no"` // Number of objects rebalanced
|
||||
NumVersions uint64 `json:"numVersions" msg:"nv"` // Number of versions rebalanced
|
||||
Bytes uint64 `json:"bytes" msg:"bs"` // Number of bytes rebalanced
|
||||
Participating bool `json:"participating" msg:"par"`
|
||||
Info rebalanceInfo `json:"info" msg:"inf"`
|
||||
}
|
||||
|
||||
func (rs *rebalanceStats) update(bucket string, oi ObjectInfo) {
|
||||
if oi.IsLatest {
|
||||
rs.NumObjects++
|
||||
}
|
||||
|
||||
rs.NumVersions++
|
||||
rs.Bytes += uint64(oi.Size)
|
||||
rs.Bucket = bucket
|
||||
rs.Object = oi.Name
|
||||
}
|
||||
|
||||
type rstats []*rebalanceStats
|
||||
|
||||
//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
|
||||
type rebalStatus uint8
|
||||
|
||||
const (
|
||||
rebalNone rebalStatus = iota
|
||||
rebalStarted
|
||||
rebalCompleted
|
||||
rebalStopped
|
||||
rebalFailed
|
||||
)
|
||||
|
||||
type rebalanceInfo struct {
|
||||
StartTime time.Time `msg:"startTs"` // Time at which rebalance-start was issued
|
||||
EndTime time.Time `msg:"stopTs"` // Time at which rebalance operation completed or rebalance-stop was called
|
||||
Status rebalStatus `msg:"status"` // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
|
||||
}
|
||||
|
||||
// rebalanceMeta contains information pertaining to an ongoing rebalance operation.
|
||||
type rebalanceMeta struct {
|
||||
cancel context.CancelFunc `msg:"-"` // to be invoked on rebalance-stop
|
||||
lastRefreshedAt time.Time `msg:"-"`
|
||||
StoppedAt time.Time `msg:"stopTs"` // Time when rebalance-stop was issued.
|
||||
ID string `msg:"id"` // ID of the ongoing rebalance operation
|
||||
PercentFreeGoal float64 `msg:"pf"` // Computed from total free space and capacity at the start of rebalance
|
||||
PoolStats []*rebalanceStats `msg:"rss"` // Per-pool rebalance stats keyed by pool index
|
||||
}
|
||||
|
||||
var errRebalanceNotStarted = errors.New("rebalance not started")
|
||||
|
||||
func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error {
|
||||
r := &rebalanceMeta{}
|
||||
err := r.load(ctx, z.serverPools[0])
|
||||
if err != nil {
|
||||
if errors.Is(err, errConfigNotFound) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
z.rebalMu.Lock()
|
||||
z.rebalMeta = r
|
||||
z.rebalMu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// initRebalanceMeta initializes rebalance metadata for a new rebalance
|
||||
// operation and saves it in the object store.
|
||||
func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) {
|
||||
r := &rebalanceMeta{
|
||||
ID: shortuuid.New(),
|
||||
PoolStats: make([]*rebalanceStats, len(z.serverPools)),
|
||||
}
|
||||
|
||||
// Fetch disk capacity and available space.
|
||||
si, _ := z.StorageInfo(ctx)
|
||||
diskStats := make([]struct {
|
||||
AvailableSpace uint64
|
||||
TotalSpace uint64
|
||||
}, len(z.serverPools))
|
||||
var totalCap, totalFree uint64
|
||||
for _, disk := range si.Disks {
|
||||
totalCap += disk.TotalSpace
|
||||
totalFree += disk.AvailableSpace
|
||||
|
||||
diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
|
||||
diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
|
||||
}
|
||||
r.PercentFreeGoal = float64(totalFree) / float64(totalCap)
|
||||
|
||||
now := time.Now()
|
||||
for idx := range z.serverPools {
|
||||
r.PoolStats[idx] = &rebalanceStats{
|
||||
Buckets: make([]string, len(buckets)),
|
||||
RebalancedBuckets: make([]string, 0, len(buckets)),
|
||||
InitFreeSpace: diskStats[idx].AvailableSpace,
|
||||
InitCapacity: diskStats[idx].TotalSpace,
|
||||
}
|
||||
copy(r.PoolStats[idx].Buckets, buckets)
|
||||
|
||||
if pfi := float64(diskStats[idx].AvailableSpace) / float64(diskStats[idx].TotalSpace); pfi < r.PercentFreeGoal {
|
||||
r.PoolStats[idx].Participating = true
|
||||
r.PoolStats[idx].Info = rebalanceInfo{
|
||||
StartTime: now,
|
||||
Status: rebalStarted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = r.save(ctx, z.serverPools[0])
|
||||
if err != nil {
|
||||
return arn, err
|
||||
}
|
||||
|
||||
z.rebalMeta = r
|
||||
return r.ID, nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) updatePoolStats(poolIdx int, bucket string, oi ObjectInfo) {
|
||||
z.rebalMu.Lock()
|
||||
defer z.rebalMu.Unlock()
|
||||
|
||||
r := z.rebalMeta
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.PoolStats[poolIdx].update(bucket, oi)
|
||||
}
|
||||
|
||||
const (
|
||||
rebalMetaName = "rebalance.bin"
|
||||
rebalMetaFmt = 1
|
||||
rebalMetaVer = 1
|
||||
)
|
||||
|
||||
func (z *erasureServerPools) nextRebalBucket(poolIdx int) (string, bool) {
|
||||
z.rebalMu.RLock()
|
||||
defer z.rebalMu.RUnlock()
|
||||
|
||||
r := z.rebalMeta
|
||||
if r == nil {
|
||||
return "", false
|
||||
}
|
||||
|
||||
ps := r.PoolStats[poolIdx]
|
||||
if ps == nil {
|
||||
return "", false
|
||||
}
|
||||
|
||||
if ps.Info.Status == rebalCompleted || !ps.Participating {
|
||||
return "", false
|
||||
}
|
||||
|
||||
if len(ps.Buckets) == 0 {
|
||||
return "", false
|
||||
}
|
||||
|
||||
return ps.Buckets[0], true
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) bucketRebalanceDone(bucket string, poolIdx int) {
|
||||
z.rebalMu.Lock()
|
||||
defer z.rebalMu.Unlock()
|
||||
|
||||
ps := z.rebalMeta.PoolStats[poolIdx]
|
||||
if ps == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for i, b := range ps.Buckets {
|
||||
if b == bucket {
|
||||
ps.Buckets = append(ps.Buckets[:i], ps.Buckets[i+1:]...)
|
||||
ps.RebalancedBuckets = append(ps.RebalancedBuckets, bucket)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rebalanceMeta) load(ctx context.Context, store objectIO) error {
|
||||
return r.loadWithOpts(ctx, store, ObjectOptions{})
|
||||
}
|
||||
|
||||
func (r *rebalanceMeta) loadWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
|
||||
data, _, err := readConfigWithMetadata(ctx, store, rebalMetaName, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(data) <= 4 {
|
||||
return fmt.Errorf("rebalanceMeta: no data")
|
||||
}
|
||||
|
||||
// Read header
|
||||
switch binary.LittleEndian.Uint16(data[0:2]) {
|
||||
case rebalMetaFmt:
|
||||
default:
|
||||
return fmt.Errorf("rebalanceMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
|
||||
}
|
||||
switch binary.LittleEndian.Uint16(data[2:4]) {
|
||||
case rebalMetaVer:
|
||||
default:
|
||||
return fmt.Errorf("rebalanceMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
|
||||
}
|
||||
|
||||
// OK, parse data.
|
||||
if _, err = r.UnmarshalMsg(data[4:]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.lastRefreshedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rebalanceMeta) saveWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
|
||||
data := make([]byte, 4, r.Msgsize()+4)
|
||||
|
||||
// Initialize the header.
|
||||
binary.LittleEndian.PutUint16(data[0:2], rebalMetaFmt)
|
||||
binary.LittleEndian.PutUint16(data[2:4], rebalMetaVer)
|
||||
|
||||
buf, err := r.MarshalMsg(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return saveConfigWithOpts(ctx, store, rebalMetaName, buf, opts)
|
||||
}
|
||||
|
||||
func (r *rebalanceMeta) save(ctx context.Context, store objectIO) error {
|
||||
return r.saveWithOpts(ctx, store, ObjectOptions{})
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) IsRebalanceStarted() bool {
|
||||
z.rebalMu.RLock()
|
||||
defer z.rebalMu.RUnlock()
|
||||
|
||||
if r := z.rebalMeta; r != nil {
|
||||
if r.StoppedAt.IsZero() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
|
||||
z.rebalMu.RLock()
|
||||
defer z.rebalMu.RUnlock()
|
||||
|
||||
if r := z.rebalMeta; r != nil {
|
||||
if !r.StoppedAt.IsZero() {
|
||||
return false
|
||||
}
|
||||
ps := z.rebalMeta.PoolStats[poolIndex]
|
||||
return ps.Participating && ps.Info.Status == rebalStarted
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
|
||||
// Save rebalance.bin periodically.
|
||||
go func() {
|
||||
// Update rebalance.bin periodically once every 5-10s, chosen randomly
|
||||
// to avoid multiple pool leaders herding to update around the same
|
||||
// time.
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
randSleepFor := func() time.Duration {
|
||||
return 5*time.Second + time.Duration(float64(5*time.Second)*r.Float64())
|
||||
}
|
||||
|
||||
timer := time.NewTimer(randSleepFor())
|
||||
defer timer.Stop()
|
||||
var rebalDone bool
|
||||
var traceMsg string
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-doneCh:
|
||||
// rebalance completed for poolIdx
|
||||
now := time.Now()
|
||||
z.rebalMu.Lock()
|
||||
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalCompleted
|
||||
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
|
||||
z.rebalMu.Unlock()
|
||||
|
||||
rebalDone = true
|
||||
traceMsg = fmt.Sprintf("completed at %s", now)
|
||||
|
||||
case <-ctx.Done():
|
||||
|
||||
// rebalance stopped for poolIdx
|
||||
now := time.Now()
|
||||
z.rebalMu.Lock()
|
||||
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalStopped
|
||||
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
|
||||
z.rebalMu.Unlock()
|
||||
|
||||
rebalDone = true
|
||||
traceMsg = fmt.Sprintf("stopped at %s", now)
|
||||
|
||||
case <-timer.C:
|
||||
traceMsg = fmt.Sprintf("saved at %s", time.Now())
|
||||
}
|
||||
|
||||
stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
|
||||
err := z.saveRebalanceStats(ctx, poolIdx, rebalSaveStats)
|
||||
stopFn(err)
|
||||
logger.LogIf(ctx, err)
|
||||
timer.Reset(randSleepFor())
|
||||
|
||||
if rebalDone {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
bucket, ok := z.nextRebalBucket(poolIdx)
|
||||
if !ok {
|
||||
// no more buckets to rebalance or target free_space/capacity reached
|
||||
break
|
||||
}
|
||||
|
||||
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
|
||||
err = z.rebalanceBucket(ctx, bucket, poolIdx)
|
||||
if err != nil {
|
||||
stopFn(err)
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
stopFn(nil)
|
||||
z.bucketRebalanceDone(bucket, poolIdx)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool {
|
||||
z.rebalMu.Lock()
|
||||
defer z.rebalMu.Unlock()
|
||||
|
||||
// check if enough objects have been rebalanced
|
||||
r := z.rebalMeta
|
||||
poolStats := r.PoolStats[poolIdx]
|
||||
if poolStats.Info.Status == rebalCompleted {
|
||||
return true
|
||||
}
|
||||
|
||||
pfi := float64(poolStats.InitFreeSpace+poolStats.Bytes) / float64(poolStats.InitCapacity)
|
||||
// Mark pool rebalance as done if within 5% from PercentFreeGoal.
|
||||
if diff := math.Abs(pfi - r.PercentFreeGoal); diff <= 0.05 {
|
||||
r.PoolStats[poolIdx].Info.Status = rebalCompleted
|
||||
r.PoolStats[poolIdx].Info.EndTime = time.Now()
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// rebalanceBucket rebalances objects under bucket in poolIdx pool
|
||||
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error {
|
||||
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
||||
vc, _ := globalBucketVersioningSys.Get(bucket)
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, _ := globalLifecycleSys.Get(bucket)
|
||||
// Check if bucket is object locked.
|
||||
lr, _ := globalBucketObjectLockSys.Get(bucket)
|
||||
|
||||
pool := z.serverPools[poolIdx]
|
||||
const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
|
||||
wStr := env.Get(envRebalanceWorkers, strconv.Itoa(len(pool.sets)))
|
||||
workerSize, err := strconv.Atoi(wStr)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("invalid %s value: %s err: %v, defaulting to %d", envRebalanceWorkers, wStr, err, len(pool.sets)))
|
||||
workerSize = len(pool.sets)
|
||||
}
|
||||
workers := make(chan struct{}, workerSize)
|
||||
var wg sync.WaitGroup
|
||||
for _, set := range pool.sets {
|
||||
set := set
|
||||
disks := set.getOnlineDisks()
|
||||
if len(disks) == 0 {
|
||||
logger.LogIf(ctx, fmt.Errorf("no online disks found for set with endpoints %s",
|
||||
set.getEndpoints()))
|
||||
continue
|
||||
}
|
||||
|
||||
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
|
||||
if lc == nil {
|
||||
return false
|
||||
}
|
||||
versioned := vc != nil && vc.Versioned(object)
|
||||
objInfo := fi.ToObjectInfo(bucket, object, versioned)
|
||||
event := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
|
||||
switch action := event.Action; action {
|
||||
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
|
||||
globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction)
|
||||
// Skip this entry.
|
||||
return true
|
||||
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
|
||||
globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
|
||||
// Skip this entry.
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
rebalanceEntry := func(entry metaCacheEntry) {
|
||||
defer func() {
|
||||
<-workers
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
if entry.isDir() {
|
||||
return
|
||||
}
|
||||
|
||||
// rebalance on poolIdx has reached its goal
|
||||
if z.checkIfRebalanceDone(poolIdx) {
|
||||
return
|
||||
}
|
||||
|
||||
fivs, err := entry.fileInfoVersions(bucket)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// We need a reversed order for rebalance,
|
||||
// to create the appropriate stack.
|
||||
versionsSorter(fivs.Versions).reverse()
|
||||
|
||||
var rebalanced int
|
||||
for _, version := range fivs.Versions {
|
||||
// Skip transitioned objects for now. TBD
|
||||
if version.IsRemote() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Apply lifecycle rules on the objects that are expired.
|
||||
if filterLifecycle(bucket, version.Name, version) {
|
||||
logger.LogIf(ctx, fmt.Errorf("found %s/%s (%s) expired object based on ILM rules, skipping and scheduled for deletion", bucket, version.Name, version.VersionID))
|
||||
continue
|
||||
}
|
||||
|
||||
// We will skip rebalancing delete markers
|
||||
// with single version, its as good as there
|
||||
// is no data associated with the object.
|
||||
if version.Deleted && len(fivs.Versions) == 1 {
|
||||
logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no data to be rebalanced", bucket, version.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
if version.Deleted {
|
||||
_, err := z.DeleteObject(ctx,
|
||||
bucket,
|
||||
version.Name,
|
||||
ObjectOptions{
|
||||
Versioned: vc.PrefixEnabled(version.Name),
|
||||
VersionID: version.VersionID,
|
||||
MTime: version.ModTime,
|
||||
DeleteReplication: version.ReplicationState,
|
||||
DeleteMarker: true, // make sure we create a delete marker
|
||||
SkipRebalancing: true, // make sure we skip the decommissioned pool
|
||||
})
|
||||
var failure bool
|
||||
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
logger.LogIf(ctx, err)
|
||||
failure = true
|
||||
}
|
||||
|
||||
if !failure {
|
||||
z.updatePoolStats(poolIdx, bucket, version.ToObjectInfo(bucket, version.Name, vc.PrefixEnabled(version.Name)))
|
||||
rebalanced++
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var failure bool
|
||||
var oi ObjectInfo
|
||||
for try := 0; try < 3; try++ {
|
||||
// GetObjectReader.Close is called by rebalanceObject
|
||||
gr, err := set.GetObjectNInfo(ctx,
|
||||
bucket,
|
||||
encodeDirObject(version.Name),
|
||||
nil,
|
||||
http.Header{},
|
||||
noLock, // all mutations are blocked reads are safe without locks.
|
||||
ObjectOptions{
|
||||
VersionID: version.VersionID,
|
||||
NoDecryption: true,
|
||||
})
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
// object deleted by the application, nothing to do here we move on.
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
failure = true
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name)
|
||||
if err = z.rebalanceObject(ctx, bucket, gr); err != nil {
|
||||
stopFn(err)
|
||||
failure = true
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
stopFn(nil)
|
||||
failure = false
|
||||
oi = gr.ObjInfo
|
||||
break
|
||||
}
|
||||
|
||||
if failure {
|
||||
break // break out on first error
|
||||
}
|
||||
z.updatePoolStats(poolIdx, bucket, oi)
|
||||
rebalanced++
|
||||
}
|
||||
|
||||
// if all versions were rebalanced, we can delete the object versions.
|
||||
if rebalanced == len(fivs.Versions) {
|
||||
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceRemoveObject, poolIdx, bucket, entry.name)
|
||||
_, err := set.DeleteObject(ctx,
|
||||
bucket,
|
||||
encodeDirObject(entry.name),
|
||||
ObjectOptions{
|
||||
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
||||
},
|
||||
)
|
||||
stopFn(err)
|
||||
auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// How to resolve partial results.
|
||||
resolver := metadataResolutionParams{
|
||||
dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
||||
objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
||||
bucket: bucket,
|
||||
}
|
||||
err := listPathRaw(ctx, listPathRawOptions{
|
||||
disks: disks,
|
||||
bucket: bucket,
|
||||
recursive: true,
|
||||
forwardTo: "",
|
||||
minDisks: len(disks) / 2, // to capture all quorum ratios
|
||||
reportNotFound: false,
|
||||
agreed: func(entry metaCacheEntry) {
|
||||
workers <- struct{}{}
|
||||
wg.Add(1)
|
||||
go rebalanceEntry(entry)
|
||||
},
|
||||
partial: func(entries metaCacheEntries, _ []error) {
|
||||
entry, ok := entries.resolve(&resolver)
|
||||
if ok {
|
||||
workers <- struct{}{}
|
||||
wg.Add(1)
|
||||
go rebalanceEntry(*entry)
|
||||
}
|
||||
},
|
||||
finished: nil,
|
||||
})
|
||||
logger.LogIf(ctx, err)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
type rebalSaveOpts uint8
|
||||
|
||||
const (
|
||||
rebalSaveStats rebalSaveOpts = iota
|
||||
rebalSaveStoppedAt
|
||||
)
|
||||
|
||||
func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int, opts rebalSaveOpts) error {
|
||||
lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
|
||||
lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
|
||||
return err
|
||||
}
|
||||
defer lock.Unlock(lkCtx.Cancel)
|
||||
|
||||
ctx = lkCtx.Context()
|
||||
noLockOpts := ObjectOptions{NoLock: true}
|
||||
r := &rebalanceMeta{}
|
||||
if err := r.loadWithOpts(ctx, z.serverPools[0], noLockOpts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
z.rebalMu.Lock()
|
||||
defer z.rebalMu.Unlock()
|
||||
|
||||
switch opts {
|
||||
case rebalSaveStoppedAt:
|
||||
r.StoppedAt = time.Now()
|
||||
case rebalSaveStats:
|
||||
r.PoolStats[poolIdx] = z.rebalMeta.PoolStats[poolIdx]
|
||||
}
|
||||
z.rebalMeta = r
|
||||
|
||||
err = z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
|
||||
return err
|
||||
}
|
||||
|
||||
func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) {
|
||||
errStr := ""
|
||||
if err != nil {
|
||||
errStr = err.Error()
|
||||
}
|
||||
auditLogInternal(ctx, AuditLogOptions{
|
||||
Event: "rebalance",
|
||||
APIName: apiName,
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
VersionID: versionID,
|
||||
Error: errStr,
|
||||
})
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
|
||||
oi := gr.ObjInfo
|
||||
|
||||
defer func() {
|
||||
gr.Close()
|
||||
auditLogRebalance(ctx, "RebalanceCopyData", oi.Bucket, oi.Name, oi.VersionID, err)
|
||||
}()
|
||||
|
||||
actualSize, err := oi.GetActualSize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if oi.isMultipart() {
|
||||
res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
|
||||
VersionID: oi.VersionID,
|
||||
MTime: oi.ModTime,
|
||||
UserDefined: oi.UserDefined,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
|
||||
}
|
||||
defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{})
|
||||
|
||||
parts := make([]CompletePart, len(oi.Parts))
|
||||
for i, part := range oi.Parts {
|
||||
hr, err := hash.NewReader(gr, part.Size, "", "", part.ActualSize)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
|
||||
}
|
||||
pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID,
|
||||
part.Number,
|
||||
NewPutObjReader(hr),
|
||||
ObjectOptions{
|
||||
PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
|
||||
IndexCB: func() []byte {
|
||||
return part.Index // Preserve part Index to ensure decompression works.
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
|
||||
}
|
||||
parts[i] = CompletePart{
|
||||
ETag: pi.ETag,
|
||||
PartNumber: pi.PartNumber,
|
||||
}
|
||||
}
|
||||
_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
|
||||
MTime: oi.ModTime,
|
||||
})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
hr, err := hash.NewReader(gr, oi.Size, "", "", actualSize)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
|
||||
}
|
||||
_, err = z.PutObject(ctx,
|
||||
bucket,
|
||||
oi.Name,
|
||||
NewPutObjReader(hr),
|
||||
ObjectOptions{
|
||||
VersionID: oi.VersionID,
|
||||
MTime: oi.ModTime,
|
||||
UserDefined: oi.UserDefined,
|
||||
PreserveETag: oi.ETag, // Preserve original ETag to ensure same metadata.
|
||||
IndexCB: func() []byte {
|
||||
return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) StartRebalance() {
|
||||
z.rebalMu.Lock()
|
||||
if z.rebalMeta == nil || !z.rebalMeta.StoppedAt.IsZero() { // rebalance not running, nothing to do
|
||||
z.rebalMu.Unlock()
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(GlobalContext)
|
||||
z.rebalMeta.cancel = cancel // to be used when rebalance-stop is called
|
||||
z.rebalMu.Unlock()
|
||||
|
||||
z.rebalMu.RLock()
|
||||
participants := make([]bool, len(z.rebalMeta.PoolStats))
|
||||
for i, ps := range z.rebalMeta.PoolStats {
|
||||
// skip pools which have completed rebalancing
|
||||
if ps.Info.Status != rebalStarted {
|
||||
continue
|
||||
}
|
||||
|
||||
participants[i] = ps.Participating
|
||||
}
|
||||
z.rebalMu.RUnlock()
|
||||
|
||||
for poolIdx, doRebalance := range participants {
|
||||
if !doRebalance {
|
||||
continue
|
||||
}
|
||||
// nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
|
||||
if !globalEndpoints[poolIdx].Endpoints[0].IsLocal {
|
||||
continue
|
||||
}
|
||||
|
||||
go func(idx int) {
|
||||
stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
|
||||
err := z.rebalanceBuckets(ctx, idx)
|
||||
stopfn(err)
|
||||
}(poolIdx)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// StopRebalance signals the rebalance goroutine running on this node (if any)
|
||||
// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
|
||||
func (z *erasureServerPools) StopRebalance() error {
|
||||
z.rebalMu.Lock()
|
||||
defer z.rebalMu.Unlock()
|
||||
|
||||
r := z.rebalMeta
|
||||
if r == nil { // rebalance not running in this node, nothing to do
|
||||
return nil
|
||||
}
|
||||
|
||||
if cancel := r.cancel; cancel != nil {
|
||||
// cancel != nil only on pool leaders
|
||||
r.cancel = nil
|
||||
cancel()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// for rebalance trace support
|
||||
type rebalanceMetrics struct{}
|
||||
|
||||
var globalRebalanceMetrics rebalanceMetrics
|
||||
|
||||
//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
|
||||
type rebalanceMetric uint8
|
||||
|
||||
const (
|
||||
rebalanceMetricRebalanceBuckets rebalanceMetric = iota
|
||||
rebalanceMetricRebalanceBucket
|
||||
rebalanceMetricRebalanceObject
|
||||
rebalanceMetricRebalanceRemoveObject
|
||||
rebalanceMetricSaveMetadata
|
||||
)
|
||||
|
||||
func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string) madmin.TraceInfo {
|
||||
var errStr string
|
||||
if err != nil {
|
||||
errStr = err.Error()
|
||||
}
|
||||
return madmin.TraceInfo{
|
||||
TraceType: madmin.TraceRebalance,
|
||||
Time: startTime,
|
||||
NodeName: globalLocalNodeName,
|
||||
FuncName: fmt.Sprintf("rebalance.%s (pool-id=%d)", r.String(), poolIdx),
|
||||
Duration: duration,
|
||||
Path: path,
|
||||
Error: errStr,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(err error) {
|
||||
startTime := time.Now()
|
||||
return func(err error) {
|
||||
duration := time.Since(startTime)
|
||||
if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
|
||||
globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " ")))
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,575 @@
|
|||
package cmd
|
||||
|
||||
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
)
|
||||
|
||||
func TestMarshalUnmarshalrebalanceInfo(t *testing.T) {
|
||||
v := rebalanceInfo{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgrebalanceInfo(b *testing.B) {
|
||||
v := rebalanceInfo{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgrebalanceInfo(b *testing.B) {
|
||||
v := rebalanceInfo{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalrebalanceInfo(b *testing.B) {
|
||||
v := rebalanceInfo{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecoderebalanceInfo(t *testing.T) {
|
||||
v := rebalanceInfo{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecoderebalanceInfo Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := rebalanceInfo{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncoderebalanceInfo(b *testing.B) {
|
||||
v := rebalanceInfo{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecoderebalanceInfo(b *testing.B) {
|
||||
v := rebalanceInfo{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalrebalanceMeta(t *testing.T) {
|
||||
v := rebalanceMeta{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgrebalanceMeta(b *testing.B) {
|
||||
v := rebalanceMeta{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgrebalanceMeta(b *testing.B) {
|
||||
v := rebalanceMeta{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalrebalanceMeta(b *testing.B) {
|
||||
v := rebalanceMeta{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecoderebalanceMeta(t *testing.T) {
|
||||
v := rebalanceMeta{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecoderebalanceMeta Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := rebalanceMeta{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncoderebalanceMeta(b *testing.B) {
|
||||
v := rebalanceMeta{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecoderebalanceMeta(b *testing.B) {
|
||||
v := rebalanceMeta{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalrebalanceMetrics(t *testing.T) {
|
||||
v := rebalanceMetrics{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgrebalanceMetrics(b *testing.B) {
|
||||
v := rebalanceMetrics{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgrebalanceMetrics(b *testing.B) {
|
||||
v := rebalanceMetrics{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalrebalanceMetrics(b *testing.B) {
|
||||
v := rebalanceMetrics{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecoderebalanceMetrics(t *testing.T) {
|
||||
v := rebalanceMetrics{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecoderebalanceMetrics Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := rebalanceMetrics{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncoderebalanceMetrics(b *testing.B) {
|
||||
v := rebalanceMetrics{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecoderebalanceMetrics(b *testing.B) {
|
||||
v := rebalanceMetrics{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalrebalanceStats(t *testing.T) {
|
||||
v := rebalanceStats{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgrebalanceStats(b *testing.B) {
|
||||
v := rebalanceStats{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgrebalanceStats(b *testing.B) {
|
||||
v := rebalanceStats{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalrebalanceStats(b *testing.B) {
|
||||
v := rebalanceStats{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecoderebalanceStats(t *testing.T) {
|
||||
v := rebalanceStats{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecoderebalanceStats Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := rebalanceStats{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncoderebalanceStats(b *testing.B) {
|
||||
v := rebalanceStats{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecoderebalanceStats(b *testing.B) {
|
||||
v := rebalanceStats{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalrstats(t *testing.T) {
|
||||
v := rstats{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgrstats(b *testing.B) {
|
||||
v := rstats{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgrstats(b *testing.B) {
|
||||
v := rstats{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalrstats(b *testing.B) {
|
||||
v := rstats{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecoderstats(t *testing.T) {
|
||||
v := rstats{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecoderstats Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := rstats{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncoderstats(b *testing.B) {
|
||||
v := rstats{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecoderstats(b *testing.B) {
|
||||
v := rstats{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -44,6 +44,10 @@ import (
|
|||
type erasureServerPools struct {
|
||||
poolMetaMutex sync.RWMutex
|
||||
poolMeta poolMeta
|
||||
|
||||
rebalMu sync.RWMutex
|
||||
rebalMeta *rebalanceMeta
|
||||
|
||||
serverPools []*erasureSets
|
||||
|
||||
// Shut down async operations
|
||||
|
@ -327,8 +331,9 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
|
|||
g := errgroup.WithNErrs(len(z.serverPools))
|
||||
for index := range z.serverPools {
|
||||
index := index
|
||||
// skip suspended pools for any new I/O.
|
||||
if z.IsSuspended(index) {
|
||||
// Skip suspended pools or pools participating in rebalance for any new
|
||||
// I/O.
|
||||
if z.IsSuspended(index) || z.IsPoolRebalancing(index) {
|
||||
continue
|
||||
}
|
||||
pool := z.serverPools[index]
|
||||
|
@ -426,6 +431,10 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
|
|||
if z.IsSuspended(pinfo.Index) && opts.SkipDecommissioned {
|
||||
continue
|
||||
}
|
||||
// Skip object if it's from pools participating in a rebalance operation.
|
||||
if opts.SkipRebalancing && z.IsPoolRebalancing(pinfo.Index) {
|
||||
continue
|
||||
}
|
||||
|
||||
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
|
||||
return pinfo, pinfo.Err
|
||||
|
@ -466,6 +475,7 @@ func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucke
|
|||
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{
|
||||
NoLock: true,
|
||||
SkipDecommissioned: true,
|
||||
SkipRebalancing: true,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -489,7 +499,10 @@ func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, objec
|
|||
// if none are found falls back to most available space pool, this function is
|
||||
// designed to be only used by PutObject, CopyObject (newObject creation) and NewMultipartUpload.
|
||||
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
|
||||
idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{SkipDecommissioned: true})
|
||||
idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{
|
||||
SkipDecommissioned: true,
|
||||
SkipRebalancing: true,
|
||||
})
|
||||
if err != nil && !isErrObjectNotFound(err) {
|
||||
return idx, err
|
||||
}
|
||||
|
@ -1387,9 +1400,10 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
|
|||
}
|
||||
|
||||
for idx, pool := range z.serverPools {
|
||||
if z.IsSuspended(idx) {
|
||||
if z.IsSuspended(idx) || z.IsPoolRebalancing(idx) {
|
||||
continue
|
||||
}
|
||||
|
||||
result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -91,7 +91,7 @@ func (iamOS *IAMObjectStore) saveIAMConfig(ctx context.Context, item interface{}
|
|||
}
|
||||
|
||||
func (iamOS *IAMObjectStore) loadIAMConfigBytesWithMetadata(ctx context.Context, objPath string) ([]byte, ObjectInfo, error) {
|
||||
data, meta, err := readConfigWithMetadata(ctx, iamOS.objAPI, objPath)
|
||||
data, meta, err := readConfigWithMetadata(ctx, iamOS.objAPI, objPath, ObjectOptions{})
|
||||
if err != nil {
|
||||
return nil, meta, err
|
||||
}
|
||||
|
|
|
@ -630,6 +630,49 @@ func (sys *NotificationSys) ReloadPoolMeta(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
// StopRebalance notifies all MinIO nodes to signal any ongoing rebalance
|
||||
// goroutine to stop.
|
||||
func (sys *NotificationSys) StopRebalance(ctx context.Context) {
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
client := client
|
||||
ng.Go(ctx, func() error {
|
||||
return client.StopRebalance(ctx)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
for _, nErr := range ng.Wait() {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
|
||||
if nErr.Err != nil {
|
||||
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// LoadRebalanceMeta notifies all peers to load rebalance.bin from object layer.
|
||||
// Note: Only peers participating in rebalance operation, namely the first node
|
||||
// in each pool will load rebalance.bin.
|
||||
func (sys *NotificationSys) LoadRebalanceMeta(ctx context.Context, startRebalance bool) {
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
client := client
|
||||
ng.Go(ctx, func() error {
|
||||
return client.LoadRebalanceMeta(ctx, startRebalance)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
for _, nErr := range ng.Wait() {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
|
||||
if nErr.Err != nil {
|
||||
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// LoadTransitionTierConfig notifies remote peers to load their remote tier
|
||||
// configs from config store.
|
||||
func (sys *NotificationSys) LoadTransitionTierConfig(ctx context.Context) {
|
||||
|
|
|
@ -85,6 +85,9 @@ type ObjectOptions struct {
|
|||
// SkipDecommissioned set to 'true' if the call requires skipping the pool being decommissioned.
|
||||
// mainly set for certain WRITE operations.
|
||||
SkipDecommissioned bool
|
||||
// SkipRebalancing should be set to 'true' if the call should skip pools
|
||||
// participating in a rebalance operation. Typically set for 'write' operations.
|
||||
SkipRebalancing bool
|
||||
|
||||
WalkFilter func(info FileInfo) bool // return WalkFilter returns 'true/false'
|
||||
WalkMarker string // set to skip until this object
|
||||
|
|
|
@ -551,6 +551,28 @@ func (client *peerRESTClient) ReloadPoolMeta(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (client *peerRESTClient) StopRebalance(ctx context.Context) error {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodStopRebalance, nil, nil, 0)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (client *peerRESTClient) LoadRebalanceMeta(ctx context.Context, startRebalance bool) error {
|
||||
values := url.Values{}
|
||||
values.Set(peerRESTStartRebalance, strconv.FormatBool(startRebalance))
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodLoadRebalanceMeta, values, nil, 0)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) error {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodLoadTransitionTierConfig, nil, nil, 0)
|
||||
if err != nil {
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
package cmd
|
||||
|
||||
const (
|
||||
peerRESTVersion = "v27" // change in GetAllBucketStats response.
|
||||
peerRESTVersion = "v28" // Added Rebalance peer APIs
|
||||
|
||||
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
|
||||
peerRESTPrefix = minioReservedBucketPath + "/peer"
|
||||
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
|
||||
|
@ -68,6 +69,8 @@ const (
|
|||
peerRESTMethodDriveSpeedTest = "/drivespeedtest"
|
||||
peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig"
|
||||
peerRESTMethodReloadPoolMeta = "/reloadpoolmeta"
|
||||
peerRESTMethodLoadRebalanceMeta = "/loadrebalancemeta"
|
||||
peerRESTMethodStopRebalance = "/stoprebalance"
|
||||
peerRESTMethodGetLastDayTierStats = "/getlastdaytierstats"
|
||||
peerRESTMethodDevNull = "/devnull"
|
||||
peerRESTMethodNetperf = "/netperf"
|
||||
|
@ -94,6 +97,7 @@ const (
|
|||
peerRESTMetricsTypes = "types"
|
||||
peerRESTDisk = "disk"
|
||||
peerRESTJobID = "job-id"
|
||||
peerRESTStartRebalance = "start-rebalance"
|
||||
|
||||
peerRESTListenBucket = "bucket"
|
||||
peerRESTListenPrefix = "prefix"
|
||||
|
|
|
@ -1048,6 +1048,60 @@ func (s *peerRESTServer) ReloadPoolMetaHandler(w http.ResponseWriter, r *http.Re
|
|||
}
|
||||
}
|
||||
|
||||
func (s *peerRESTServer) StopRebalanceHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("invalid request"))
|
||||
return
|
||||
}
|
||||
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
s.writeErrorResponse(w, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
pools, ok := objAPI.(*erasureServerPools)
|
||||
if !ok {
|
||||
s.writeErrorResponse(w, errors.New("not a multiple pools setup"))
|
||||
return
|
||||
}
|
||||
|
||||
pools.StopRebalance()
|
||||
}
|
||||
|
||||
func (s *peerRESTServer) LoadRebalanceMetaHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("invalid request"))
|
||||
return
|
||||
}
|
||||
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
s.writeErrorResponse(w, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
|
||||
pools, ok := objAPI.(*erasureServerPools)
|
||||
if !ok {
|
||||
s.writeErrorResponse(w, errors.New("not a multiple pools setup"))
|
||||
return
|
||||
}
|
||||
|
||||
startRebalanceStr := r.Form.Get(peerRESTStartRebalance)
|
||||
startRebalance, err := strconv.ParseBool(startRebalanceStr)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := pools.loadRebalanceMeta(r.Context()); err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
if startRebalance {
|
||||
go pools.StartRebalance()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *peerRESTServer) LoadTransitionTierConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("invalid request"))
|
||||
|
@ -1352,5 +1406,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
|
|||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDevNull).HandlerFunc(httpTraceHdrs(server.DevNull))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadRebalanceMeta).HandlerFunc(httpTraceHdrs(server.LoadRebalanceMetaHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStopRebalance).HandlerFunc(httpTraceHdrs(server.StopRebalanceHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(httpTraceHdrs(server.GetLastDayTierStatsHandler))
|
||||
}
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
// Copyright (c) 2022 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type rebalPoolProgress struct {
|
||||
NumObjects uint64 `json:"objects"`
|
||||
NumVersions uint64 `json:"versions"`
|
||||
Bytes uint64 `json:"bytes"`
|
||||
Bucket string `json:"bucket"`
|
||||
Object string `json:"object"`
|
||||
Elapsed time.Duration `json:"elapsed"`
|
||||
ETA time.Duration `json:"eta"`
|
||||
}
|
||||
|
||||
type rebalancePoolStatus struct {
|
||||
ID int `json:"id"` // Pool index (zero-based)
|
||||
Status string `json:"status"` // Active if rebalance is running, empty otherwise
|
||||
Used float64 `json:"used"` // Percentage used space
|
||||
Progress rebalPoolProgress `json:"progress,omitempty"` // is empty when rebalance is not running
|
||||
}
|
||||
|
||||
// rebalanceAdminStatus holds rebalance status related information exported to mc, console, etc.
|
||||
type rebalanceAdminStatus struct {
|
||||
ID string // identifies the ongoing rebalance operation by a uuid
|
||||
Pools []rebalancePoolStatus `json:"pools"` // contains all pools, including inactive
|
||||
StoppedAt time.Time `json:"stoppedAt,omitempty"`
|
||||
}
|
||||
|
||||
func rebalanceStatus(ctx context.Context, z *erasureServerPools) (r rebalanceAdminStatus, err error) {
|
||||
// Load latest rebalance status
|
||||
meta := &rebalanceMeta{}
|
||||
err = meta.load(ctx, z.serverPools[0])
|
||||
if err != nil {
|
||||
return r, err
|
||||
}
|
||||
|
||||
// Compute disk usage percentage
|
||||
si, _ := z.StorageInfo(ctx)
|
||||
diskStats := make([]struct {
|
||||
AvailableSpace uint64
|
||||
TotalSpace uint64
|
||||
}, len(z.serverPools))
|
||||
for _, disk := range si.Disks {
|
||||
diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
|
||||
diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
|
||||
}
|
||||
|
||||
stopTime := meta.StoppedAt
|
||||
r = rebalanceAdminStatus{
|
||||
ID: meta.ID,
|
||||
StoppedAt: meta.StoppedAt,
|
||||
Pools: make([]rebalancePoolStatus, len(meta.PoolStats)),
|
||||
}
|
||||
for i, ps := range meta.PoolStats {
|
||||
r.Pools[i] = rebalancePoolStatus{
|
||||
ID: i,
|
||||
Status: ps.Info.Status.String(),
|
||||
Used: float64(diskStats[i].TotalSpace-diskStats[i].AvailableSpace) / float64(diskStats[i].TotalSpace),
|
||||
}
|
||||
if !ps.Participating {
|
||||
continue
|
||||
}
|
||||
// for participating pools, total bytes to be rebalanced by this pool is given by,
|
||||
// pf_c = (f_i + x)/c_i,
|
||||
// pf_c - percentage free space across pools, f_i - ith pool's free space, c_i - ith pool's capacity
|
||||
// i.e. x = c_i*pfc -f_i
|
||||
totalBytesToRebal := float64(ps.InitCapacity)*meta.PercentFreeGoal - float64(ps.InitFreeSpace)
|
||||
elapsed := time.Since(ps.Info.StartTime)
|
||||
eta := time.Duration(totalBytesToRebal * float64(elapsed) / float64(ps.Bytes))
|
||||
if !ps.Info.EndTime.IsZero() {
|
||||
stopTime = ps.Info.EndTime
|
||||
}
|
||||
|
||||
if !stopTime.IsZero() { // rebalance is stopped or completed
|
||||
elapsed = stopTime.Sub(ps.Info.StartTime)
|
||||
eta = 0
|
||||
}
|
||||
|
||||
r.Pools[i].Progress = rebalPoolProgress{
|
||||
NumObjects: ps.NumObjects,
|
||||
NumVersions: ps.NumVersions,
|
||||
Bytes: ps.Bytes,
|
||||
Elapsed: elapsed,
|
||||
ETA: eta,
|
||||
}
|
||||
}
|
||||
return r, nil
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
// Code generated by "stringer -type=rebalanceMetric -trimprefix=rebalanceMetric erasure-server-pool-rebalance.go"; DO NOT EDIT.
|
||||
|
||||
package cmd
|
||||
|
||||
import "strconv"
|
||||
|
||||
func _() {
|
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{}
|
||||
_ = x[rebalanceMetricRebalanceBuckets-0]
|
||||
_ = x[rebalanceMetricRebalanceBucket-1]
|
||||
_ = x[rebalanceMetricRebalanceObject-2]
|
||||
_ = x[rebalanceMetricRebalanceRemoveObject-3]
|
||||
_ = x[rebalanceMetricSaveMetadata-4]
|
||||
}
|
||||
|
||||
const _rebalanceMetric_name = "RebalanceBucketsRebalanceBucketRebalanceObjectRebalanceRemoveObjectSaveMetadata"
|
||||
|
||||
var _rebalanceMetric_index = [...]uint8{0, 16, 31, 46, 67, 79}
|
||||
|
||||
func (i rebalanceMetric) String() string {
|
||||
if i >= rebalanceMetric(len(_rebalanceMetric_index)-1) {
|
||||
return "rebalanceMetric(" + strconv.FormatInt(int64(i), 10) + ")"
|
||||
}
|
||||
return _rebalanceMetric_name[_rebalanceMetric_index[i]:_rebalanceMetric_index[i+1]]
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
// Code generated by "stringer -type=rebalStatus -trimprefix=rebal erasure-server-pool-rebalance.go"; DO NOT EDIT.
|
||||
|
||||
package cmd
|
||||
|
||||
import "strconv"
|
||||
|
||||
func _() {
|
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{}
|
||||
_ = x[rebalNone-0]
|
||||
_ = x[rebalStarted-1]
|
||||
_ = x[rebalCompleted-2]
|
||||
_ = x[rebalStopped-3]
|
||||
_ = x[rebalFailed-4]
|
||||
}
|
||||
|
||||
const _rebalStatus_name = "NoneStartedCompletedStoppedFailed"
|
||||
|
||||
var _rebalStatus_index = [...]uint8{0, 4, 11, 20, 27, 33}
|
||||
|
||||
func (i rebalStatus) String() string {
|
||||
if i >= rebalStatus(len(_rebalStatus_index)-1) {
|
||||
return "rebalStatus(" + strconv.FormatInt(int64(i), 10) + ")"
|
||||
}
|
||||
return _rebalStatus_name[_rebalStatus_index[i]:_rebalStatus_index[i+1]]
|
||||
}
|
Loading…
Reference in New Issue