mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
feat: decommission feature for pools (#14012)
``` λ mc admin decommission start alias/ http://minio{1...2}/data{1...4} ``` ``` λ mc admin decommission status alias/ ┌─────┬─────────────────────────────────┬──────────────────────────────────┬────────┐ │ ID │ Pools │ Capacity │ Status │ │ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Active │ │ 2nd │ http://minio{3...4}/data{1...4} │ 329 GiB (used) / 421 GiB (total) │ Active │ └─────┴─────────────────────────────────┴──────────────────────────────────┴────────┘ ``` ``` λ mc admin decommission status alias/ http://minio{1...2}/data{1...4} Progress: ===================> [1GiB/sec] [15%] [4TiB/50TiB] Time Remaining: 4 hours (started 3 hours ago) ``` ``` λ mc admin decommission status alias/ http://minio{1...2}/data{1...4} ERROR: This pool is not scheduled for decommissioning currently. ``` ``` λ mc admin decommission cancel alias/ ┌─────┬─────────────────────────────────┬──────────────────────────────────┬──────────┐ │ ID │ Pools │ Capacity │ Status │ │ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining │ └─────┴─────────────────────────────────┴──────────────────────────────────┴──────────┘ ``` > NOTE: Canceled decommission will not make the pool active again, since we might have > Potentially partial duplicate content on the other pools, to avoid this scenario be > very sure to start decommissioning as a planned activity. ``` λ mc admin decommission cancel alias/ http://minio{1...2}/data{1...4} ┌─────┬─────────────────────────────────┬──────────────────────────────────┬────────────────────┐ │ ID │ Pools │ Capacity │ Status │ │ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining(Canceled) │ └─────┴─────────────────────────────────┴──────────────────────────────────┴────────────────────┘ ```
This commit is contained in:
919
cmd/erasure-server-pool-decom.go
Normal file
919
cmd/erasure-server-pool-decom.go
Normal file
@@ -0,0 +1,919 @@
|
||||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/config/storageclass"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
)
|
||||
|
||||
// PoolDecommissionInfo currently decommissioning information
|
||||
type PoolDecommissionInfo struct {
|
||||
StartTime time.Time `json:"startTime" msg:"st"`
|
||||
StartSize int64 `json:"startSize" msg:"ss"`
|
||||
TotalSize int64 `json:"totalSize" msg:"ts"`
|
||||
CurrentSize int64 `json:"currentSize" msg:"cs"`
|
||||
|
||||
Complete bool `json:"complete" msg:"cmp"`
|
||||
Failed bool `json:"failed" msg:"fl"`
|
||||
Canceled bool `json:"canceled" msg:"cnl"`
|
||||
|
||||
// Internal information.
|
||||
QueuedBuckets []string `json:"-" msg:"bkts"`
|
||||
DecommissionedBuckets []string `json:"-" msg:"dbkts"`
|
||||
|
||||
// Last bucket/object decommissioned.
|
||||
Bucket string `json:"-" msg:"bkt"`
|
||||
Object string `json:"-" msg:"obj"`
|
||||
|
||||
// Verbose information
|
||||
ItemsDecommissioned uint64 `json:"-" msg:"id"`
|
||||
ItemsDecommissionFailed uint64 `json:"-" msg:"idf"`
|
||||
BytesDone uint64 `json:"-" msg:"bd"`
|
||||
BytesFailed uint64 `json:"-" msg:"bf"`
|
||||
}
|
||||
|
||||
// bucketPop should be called when a bucket is done decommissioning.
|
||||
// Adds the bucket to the list of decommissioned buckets and updates resume numbers.
|
||||
func (pd *PoolDecommissionInfo) bucketPop(bucket string) {
|
||||
pd.DecommissionedBuckets = append(pd.DecommissionedBuckets, bucket)
|
||||
for i, b := range pd.QueuedBuckets {
|
||||
if b == bucket {
|
||||
// Bucket is done.
|
||||
pd.QueuedBuckets = append(pd.QueuedBuckets[:i], pd.QueuedBuckets[i+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pd *PoolDecommissionInfo) bucketsToDecommission() []string {
|
||||
return pd.QueuedBuckets
|
||||
}
|
||||
|
||||
func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool {
|
||||
for _, b := range pd.DecommissionedBuckets {
|
||||
if b == bucket {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (pd *PoolDecommissionInfo) bucketPush(bucket string) {
|
||||
for _, b := range pd.QueuedBuckets {
|
||||
if pd.isBucketDecommissioned(b) {
|
||||
return
|
||||
}
|
||||
if b == bucket {
|
||||
return
|
||||
}
|
||||
}
|
||||
pd.QueuedBuckets = append(pd.QueuedBuckets, bucket)
|
||||
}
|
||||
|
||||
// PoolStatus captures current pool status
|
||||
type PoolStatus struct {
|
||||
ID int `json:"id" msg:"id"`
|
||||
CmdLine string `json:"cmdline" msg:"cl"`
|
||||
LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
|
||||
Decommission *PoolDecommissionInfo `json:"decommissionInfo,omitempty" msg:"dec"`
|
||||
}
|
||||
|
||||
//go:generate msgp -file $GOFILE -unexported
|
||||
type poolMeta struct {
|
||||
Version int `msg:"v"`
|
||||
Pools []PoolStatus `msg:"pls"`
|
||||
}
|
||||
|
||||
// A decommission resumable tells us if decommission is worth
|
||||
// resuming upon restart of a cluster.
|
||||
func (p *poolMeta) returnResumablePools(n int) []PoolStatus {
|
||||
var newPools []PoolStatus
|
||||
for _, pool := range p.Pools {
|
||||
if pool.Decommission == nil {
|
||||
continue
|
||||
}
|
||||
if pool.Decommission.Complete || pool.Decommission.Canceled {
|
||||
// Do not resume decommission upon startup for
|
||||
// - decommission complete
|
||||
// - decommission canceled
|
||||
continue
|
||||
} // In all other situations we need to resume
|
||||
newPools = append(newPools, pool)
|
||||
if n > 0 && len(newPools) == n {
|
||||
return newPools
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *poolMeta) DecommissionComplete(idx int) bool {
|
||||
if p.Pools[idx].Decommission != nil {
|
||||
p.Pools[idx].LastUpdate = time.Now().UTC()
|
||||
p.Pools[idx].Decommission.Complete = true
|
||||
p.Pools[idx].Decommission.Failed = false
|
||||
p.Pools[idx].Decommission.Canceled = false
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *poolMeta) DecommissionFailed(idx int) bool {
|
||||
if p.Pools[idx].Decommission != nil {
|
||||
p.Pools[idx].LastUpdate = time.Now().UTC()
|
||||
p.Pools[idx].Decommission.StartTime = time.Time{}
|
||||
p.Pools[idx].Decommission.Complete = false
|
||||
p.Pools[idx].Decommission.Failed = true
|
||||
p.Pools[idx].Decommission.Canceled = false
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *poolMeta) DecommissionCancel(idx int) bool {
|
||||
if p.Pools[idx].Decommission != nil {
|
||||
p.Pools[idx].LastUpdate = time.Now().UTC()
|
||||
p.Pools[idx].Decommission.StartTime = time.Time{}
|
||||
p.Pools[idx].Decommission.Complete = false
|
||||
p.Pools[idx].Decommission.Failed = false
|
||||
p.Pools[idx].Decommission.Canceled = true
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p poolMeta) isBucketDecommissioned(idx int, bucket string) bool {
|
||||
return p.Pools[idx].Decommission.isBucketDecommissioned(bucket)
|
||||
}
|
||||
|
||||
func (p *poolMeta) BucketDone(idx int, bucket string) {
|
||||
if p.Pools[idx].Decommission == nil {
|
||||
// Decommission not in progress.
|
||||
return
|
||||
}
|
||||
p.Pools[idx].Decommission.bucketPop(bucket)
|
||||
}
|
||||
|
||||
func (p poolMeta) ResumeBucketObject(idx int) (bucket, object string) {
|
||||
if p.Pools[idx].Decommission != nil {
|
||||
bucket = p.Pools[idx].Decommission.Bucket
|
||||
object = p.Pools[idx].Decommission.Object
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (p *poolMeta) TrackCurrentBucketObject(idx int, bucket string, object string) {
|
||||
if p.Pools[idx].Decommission == nil {
|
||||
// Decommission not in progress.
|
||||
return
|
||||
}
|
||||
p.Pools[idx].Decommission.Bucket = bucket
|
||||
p.Pools[idx].Decommission.Object = object
|
||||
}
|
||||
|
||||
func (p *poolMeta) PendingBuckets(idx int) []string {
|
||||
if p.Pools[idx].Decommission == nil {
|
||||
// Decommission not in progress.
|
||||
return nil
|
||||
}
|
||||
|
||||
return p.Pools[idx].Decommission.bucketsToDecommission()
|
||||
}
|
||||
|
||||
func (p *poolMeta) QueueBuckets(idx int, buckets []BucketInfo) {
|
||||
// add new queued buckets
|
||||
for _, bucket := range buckets {
|
||||
p.Pools[idx].Decommission.bucketPush(bucket.Name)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
errDecommissionAlreadyRunning = errors.New("decommission is already in progress")
|
||||
errDecommissionComplete = errors.New("decommission is complete, please remove the servers from command-line")
|
||||
)
|
||||
|
||||
func (p *poolMeta) Decommission(idx int, info StorageInfo) error {
|
||||
for i, pool := range p.Pools {
|
||||
if idx == i {
|
||||
continue
|
||||
}
|
||||
if pool.Decommission != nil {
|
||||
// Do not allow multiple decommissions at the same time.
|
||||
// We shall for now only allow one pool decommission at
|
||||
// a time.
|
||||
return fmt.Errorf("%w at index: %d", errDecommissionAlreadyRunning, i)
|
||||
}
|
||||
}
|
||||
if p.Pools[idx].Decommission == nil {
|
||||
startSize := TotalUsableCapacityFree(info)
|
||||
totalSize := TotalUsableCapacity(info)
|
||||
p.Pools[idx].LastUpdate = time.Now().UTC()
|
||||
p.Pools[idx].Decommission = &PoolDecommissionInfo{
|
||||
StartTime: UTCNow(),
|
||||
StartSize: startSize,
|
||||
CurrentSize: startSize,
|
||||
TotalSize: totalSize,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Completed pool doesn't need to be decommissioned again.
|
||||
if p.Pools[idx].Decommission.Complete {
|
||||
return errDecommissionComplete
|
||||
}
|
||||
|
||||
// Canceled or Failed decommission can be triggered again.
|
||||
if p.Pools[idx].Decommission.StartTime.IsZero() {
|
||||
if p.Pools[idx].Decommission.Canceled || p.Pools[idx].Decommission.Failed {
|
||||
startSize := TotalUsableCapacityFree(info)
|
||||
totalSize := TotalUsableCapacity(info)
|
||||
p.Pools[idx].LastUpdate = time.Now().UTC()
|
||||
p.Pools[idx].Decommission = &PoolDecommissionInfo{
|
||||
StartTime: UTCNow(),
|
||||
StartSize: startSize,
|
||||
CurrentSize: startSize,
|
||||
TotalSize: totalSize,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
} // In-progress pool doesn't need to be decommissioned again.
|
||||
|
||||
// In all other scenarios an active decommissioning is in progress.
|
||||
return errDecommissionAlreadyRunning
|
||||
}
|
||||
|
||||
func (p poolMeta) IsSuspended(idx int) bool {
|
||||
return p.Pools[idx].Decommission != nil
|
||||
}
|
||||
|
||||
func (p *poolMeta) load(ctx context.Context, pool *erasureSets, pools []*erasureSets) (bool, error) {
|
||||
data, err := readConfig(ctx, pool, poolMetaName)
|
||||
if err != nil {
|
||||
if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
|
||||
return true, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
if len(data) <= 4 {
|
||||
return false, fmt.Errorf("poolMeta: no data")
|
||||
}
|
||||
// Read header
|
||||
switch binary.LittleEndian.Uint16(data[0:2]) {
|
||||
case poolMetaFormat:
|
||||
default:
|
||||
return false, fmt.Errorf("poolMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
|
||||
}
|
||||
switch binary.LittleEndian.Uint16(data[2:4]) {
|
||||
case poolMetaVersion:
|
||||
default:
|
||||
return false, fmt.Errorf("poolMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
|
||||
}
|
||||
|
||||
// OK, parse data.
|
||||
if _, err = p.UnmarshalMsg(data[4:]); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
switch p.Version {
|
||||
case poolMetaVersionV1:
|
||||
default:
|
||||
return false, fmt.Errorf("unexpected pool meta version: %d", p.Version)
|
||||
}
|
||||
|
||||
type poolInfo struct {
|
||||
position int
|
||||
completed bool
|
||||
}
|
||||
|
||||
rememberedPools := make(map[string]poolInfo)
|
||||
for idx, pool := range p.Pools {
|
||||
complete := false
|
||||
if pool.Decommission != nil && pool.Decommission.Complete {
|
||||
complete = true
|
||||
}
|
||||
rememberedPools[pool.CmdLine] = poolInfo{
|
||||
position: idx,
|
||||
completed: complete,
|
||||
}
|
||||
}
|
||||
|
||||
specifiedPools := make(map[string]int)
|
||||
for idx, pool := range pools {
|
||||
specifiedPools[pool.endpoints.CmdLine] = idx
|
||||
}
|
||||
|
||||
// Check if specified pools need to remove decommissioned pool.
|
||||
for k := range specifiedPools {
|
||||
pi, ok := rememberedPools[k]
|
||||
if ok && pi.completed {
|
||||
return false, fmt.Errorf("pool(%s) = %s is decommissioned, please remove from server command line", humanize.Ordinal(pi.position+1), k)
|
||||
}
|
||||
}
|
||||
|
||||
// check if remembered pools are in right position or missing from command line.
|
||||
for k, pi := range rememberedPools {
|
||||
if pi.completed {
|
||||
continue
|
||||
}
|
||||
_, ok := specifiedPools[k]
|
||||
if !ok {
|
||||
return false, fmt.Errorf("pool(%s) = %s is not specified, please specify on server command line", humanize.Ordinal(pi.position+1), k)
|
||||
}
|
||||
}
|
||||
|
||||
// check when remembered pools and specified pools are same they are at the expected position
|
||||
if len(rememberedPools) == len(specifiedPools) {
|
||||
for k, pi := range rememberedPools {
|
||||
pos, ok := specifiedPools[k]
|
||||
if !ok {
|
||||
return false, fmt.Errorf("pool(%s) = %s is not specified, please specify on server command line", humanize.Ordinal(pi.position+1), k)
|
||||
}
|
||||
if pos != pi.position {
|
||||
return false, fmt.Errorf("pool order change detected for %s, expected position is (%s) but found (%s)", k, humanize.Ordinal(pi.position+1), humanize.Ordinal(pos+1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return len(rememberedPools) != len(specifiedPools), nil
|
||||
}
|
||||
|
||||
func (p *poolMeta) CountItem(idx int, size int64, failed bool) {
|
||||
pd := p.Pools[idx].Decommission
|
||||
if pd != nil {
|
||||
if failed {
|
||||
pd.ItemsDecommissionFailed++
|
||||
pd.BytesFailed += uint64(size)
|
||||
} else {
|
||||
pd.ItemsDecommissioned++
|
||||
pd.BytesDone += uint64(size)
|
||||
}
|
||||
p.Pools[idx].Decommission = pd
|
||||
}
|
||||
}
|
||||
|
||||
func (p *poolMeta) updateAfter(ctx context.Context, idx int, pools []*erasureSets, duration time.Duration) error {
|
||||
if p.Pools[idx].Decommission == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
if time.Since(p.Pools[idx].LastUpdate) > duration {
|
||||
p.Pools[idx].LastUpdate = time.Now().UTC()
|
||||
return p.save(ctx, pools)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p poolMeta) save(ctx context.Context, pools []*erasureSets) error {
|
||||
data := make([]byte, 4, p.Msgsize()+4)
|
||||
|
||||
// Initialize the header.
|
||||
binary.LittleEndian.PutUint16(data[0:2], poolMetaFormat)
|
||||
binary.LittleEndian.PutUint16(data[2:4], poolMetaVersion)
|
||||
|
||||
buf, err := p.MarshalMsg(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Saves on all pools to make sure decommissioning of first pool is allowed.
|
||||
for _, eset := range pools {
|
||||
if err = saveConfig(ctx, eset, poolMetaName, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
poolMetaName = "pool.bin"
|
||||
poolMetaFormat = 1
|
||||
poolMetaVersionV1 = 1
|
||||
poolMetaVersion = poolMetaVersionV1
|
||||
)
|
||||
|
||||
// Init() initializes pools and saves additional information about them
|
||||
// in 'pool.bin', this is eventually used for decommissioning the pool.
|
||||
func (z *erasureServerPools) Init(ctx context.Context) error {
|
||||
meta := poolMeta{}
|
||||
|
||||
update, err := meta.load(ctx, z.serverPools[0], z.serverPools)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if no update is needed return right away.
|
||||
if !update {
|
||||
// We are only supporting single pool decommission at this time
|
||||
// so it makes sense to only resume single pools at any given
|
||||
// time, in future meta.returnResumablePools() might take
|
||||
// '-1' as argument to decommission multiple pools at a time
|
||||
// but this is not a priority at the moment.
|
||||
for _, pool := range meta.returnResumablePools(1) {
|
||||
err := z.Decommission(ctx, pool.ID)
|
||||
switch err {
|
||||
case errDecommissionAlreadyRunning:
|
||||
fallthrough
|
||||
case nil:
|
||||
go z.doDecommissionInRoutine(ctx, pool.ID)
|
||||
}
|
||||
}
|
||||
z.poolMeta = meta
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// looks like new pool was added we need to update,
|
||||
// or this is a fresh installation (or an existing
|
||||
// installation with pool removed)
|
||||
meta.Version = poolMetaVersion
|
||||
for idx, pool := range z.serverPools {
|
||||
meta.Pools = append(meta.Pools, PoolStatus{
|
||||
CmdLine: pool.endpoints.CmdLine,
|
||||
ID: idx,
|
||||
LastUpdate: time.Now().UTC(),
|
||||
})
|
||||
}
|
||||
if err = meta.save(ctx, z.serverPools); err != nil {
|
||||
return err
|
||||
}
|
||||
z.poolMeta = meta
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
|
||||
defer gr.Close()
|
||||
objInfo := gr.ObjInfo
|
||||
if objInfo.isMultipart() {
|
||||
uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name,
|
||||
ObjectOptions{
|
||||
VersionID: objInfo.VersionID,
|
||||
MTime: objInfo.ModTime,
|
||||
UserDefined: objInfo.UserDefined,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{})
|
||||
parts := make([]CompletePart, 0, len(objInfo.Parts))
|
||||
for _, part := range objInfo.Parts {
|
||||
hr, err := hash.NewReader(gr, part.Size, "", "", part.Size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID,
|
||||
part.Number,
|
||||
NewPutObjReader(hr),
|
||||
ObjectOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
parts = append(parts, CompletePart{
|
||||
PartNumber: part.Number,
|
||||
ETag: part.ETag,
|
||||
})
|
||||
}
|
||||
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, uploadID, parts, ObjectOptions{
|
||||
MTime: objInfo.ModTime,
|
||||
})
|
||||
return err
|
||||
}
|
||||
hr, err := hash.NewReader(gr, objInfo.Size, "", "", objInfo.Size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = z.PutObject(ctx,
|
||||
bucket,
|
||||
objInfo.Name,
|
||||
NewPutObjReader(hr),
|
||||
ObjectOptions{
|
||||
VersionID: objInfo.VersionID,
|
||||
MTime: objInfo.ModTime,
|
||||
UserDefined: objInfo.UserDefined,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// versionsSorter sorts FileInfo slices by version.
|
||||
//msgp:ignore versionsSorter
|
||||
type versionsSorter []FileInfo
|
||||
|
||||
func (v versionsSorter) reverse() {
|
||||
sort.Slice(v, func(i, j int) bool {
|
||||
return v[i].ModTime.Before(v[j].ModTime)
|
||||
})
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error {
|
||||
var forwardTo string
|
||||
// If we resume to the same bucket, forward to last known item.
|
||||
rbucket, robject := z.poolMeta.ResumeBucketObject(idx)
|
||||
if rbucket != "" {
|
||||
if rbucket == bName {
|
||||
forwardTo = robject
|
||||
}
|
||||
}
|
||||
|
||||
versioned := globalBucketVersioningSys.Enabled(bName)
|
||||
for _, set := range pool.sets {
|
||||
disks := set.getOnlineDisks()
|
||||
if len(disks) == 0 {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("no online disks found for set with endpoints %s",
|
||||
set.getEndpoints()))
|
||||
continue
|
||||
}
|
||||
|
||||
decommissionEntry := func(entry metaCacheEntry) {
|
||||
if entry.isDir() {
|
||||
return
|
||||
}
|
||||
|
||||
fivs, err := entry.fileInfoVersions(bName)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// We need a reversed order for Decommissioning,
|
||||
// to create the appropriate stack.
|
||||
versionsSorter(fivs.Versions).reverse()
|
||||
|
||||
for _, version := range fivs.Versions {
|
||||
// TODO: Skip transitioned objects for now.
|
||||
if version.IsRemote() {
|
||||
continue
|
||||
}
|
||||
// We will skip decommissioning delete markers
|
||||
// with single version, its as good as there
|
||||
// is no data associated with the object.
|
||||
if version.Deleted && len(fivs.Versions) == 1 {
|
||||
continue
|
||||
}
|
||||
if version.Deleted {
|
||||
_, err := z.DeleteObject(ctx,
|
||||
bName,
|
||||
version.Name,
|
||||
ObjectOptions{
|
||||
Versioned: versioned,
|
||||
VersionID: version.VersionID,
|
||||
MTime: version.ModTime,
|
||||
DeleteReplication: version.ReplicationState,
|
||||
})
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta.CountItem(idx, 0, true)
|
||||
z.poolMetaMutex.Unlock()
|
||||
} else {
|
||||
set.DeleteObject(ctx,
|
||||
bName,
|
||||
version.Name,
|
||||
ObjectOptions{
|
||||
VersionID: version.VersionID,
|
||||
})
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta.CountItem(idx, 0, false)
|
||||
z.poolMetaMutex.Unlock()
|
||||
}
|
||||
continue
|
||||
}
|
||||
gr, err := set.GetObjectNInfo(ctx,
|
||||
bName,
|
||||
version.Name,
|
||||
nil,
|
||||
http.Header{},
|
||||
noLock, // all mutations are blocked reads are safe without locks.
|
||||
ObjectOptions{
|
||||
VersionID: version.VersionID,
|
||||
})
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta.CountItem(idx, version.Size, true)
|
||||
z.poolMetaMutex.Unlock()
|
||||
continue
|
||||
}
|
||||
// gr.Close() is ensured by decommissionObject().
|
||||
if err = z.decommissionObject(ctx, bName, gr); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta.CountItem(idx, version.Size, true)
|
||||
z.poolMetaMutex.Unlock()
|
||||
continue
|
||||
}
|
||||
set.DeleteObject(ctx,
|
||||
bName,
|
||||
version.Name,
|
||||
ObjectOptions{
|
||||
VersionID: version.VersionID,
|
||||
})
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta.CountItem(idx, version.Size, false)
|
||||
z.poolMetaMutex.Unlock()
|
||||
}
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name)
|
||||
logger.LogIf(ctx, z.poolMeta.updateAfter(ctx, idx, z.serverPools, time.Minute))
|
||||
z.poolMetaMutex.Unlock()
|
||||
}
|
||||
|
||||
// How to resolve partial results.
|
||||
resolver := metadataResolutionParams{
|
||||
dirQuorum: set.defaultRQuorum(),
|
||||
objQuorum: set.defaultRQuorum(),
|
||||
bucket: bName,
|
||||
}
|
||||
|
||||
if err := listPathRaw(ctx, listPathRawOptions{
|
||||
disks: disks,
|
||||
bucket: bName,
|
||||
recursive: true,
|
||||
forwardTo: forwardTo,
|
||||
minDisks: len(disks),
|
||||
reportNotFound: false,
|
||||
agreed: decommissionEntry,
|
||||
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
|
||||
entry, ok := entries.resolve(&resolver)
|
||||
if ok {
|
||||
decommissionEntry(*entry)
|
||||
}
|
||||
},
|
||||
finished: nil,
|
||||
}); err != nil {
|
||||
// Decommissioning failed and won't continue
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx int) error {
|
||||
pool := z.serverPools[idx]
|
||||
for _, bucket := range z.poolMeta.PendingBuckets(idx) {
|
||||
if z.poolMeta.isBucketDecommissioned(idx, bucket) {
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta.BucketDone(idx, bucket) // remove from pendingBuckets and persist.
|
||||
z.poolMeta.save(ctx, z.serverPools)
|
||||
z.poolMetaMutex.Unlock()
|
||||
continue
|
||||
}
|
||||
if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil {
|
||||
return err
|
||||
}
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta.BucketDone(idx, bucket)
|
||||
z.poolMeta.save(ctx, z.serverPools)
|
||||
z.poolMetaMutex.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx int) {
|
||||
z.poolMetaMutex.Lock()
|
||||
var dctx context.Context
|
||||
dctx, z.decommissionCancelers[idx] = context.WithCancel(GlobalContext)
|
||||
z.poolMetaMutex.Unlock()
|
||||
|
||||
if err := z.decommissionInBackground(dctx, idx); err != nil {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
|
||||
return
|
||||
}
|
||||
// Complete the decommission..
|
||||
logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx))
|
||||
}
|
||||
|
||||
// Decommission - start decommission session.
|
||||
func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error {
|
||||
if idx < 0 {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
if z.SinglePool() {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
// Make pool unwritable before decommissioning.
|
||||
if err := z.StartDecommission(ctx, idx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go z.doDecommissionInRoutine(ctx, idx)
|
||||
|
||||
// Successfully started decommissioning.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, error) {
|
||||
if idx < 0 {
|
||||
return PoolStatus{}, errInvalidArgument
|
||||
}
|
||||
|
||||
z.poolMetaMutex.RLock()
|
||||
defer z.poolMetaMutex.RUnlock()
|
||||
|
||||
if idx+1 > len(z.poolMeta.Pools) {
|
||||
return PoolStatus{}, errInvalidArgument
|
||||
}
|
||||
|
||||
pool := z.serverPools[idx]
|
||||
info, _ := pool.StorageInfo(ctx)
|
||||
info.Backend.Type = madmin.Erasure
|
||||
|
||||
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||
if scParity <= 0 {
|
||||
scParity = z.serverPools[0].defaultParityCount
|
||||
}
|
||||
rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
|
||||
info.Backend.StandardSCData = append(info.Backend.StandardSCData, pool.SetDriveCount()-scParity)
|
||||
info.Backend.RRSCData = append(info.Backend.RRSCData, pool.SetDriveCount()-rrSCParity)
|
||||
info.Backend.StandardSCParity = scParity
|
||||
info.Backend.RRSCParity = rrSCParity
|
||||
|
||||
currentSize := TotalUsableCapacityFree(info)
|
||||
|
||||
poolInfo := z.poolMeta.Pools[idx]
|
||||
if poolInfo.Decommission != nil {
|
||||
poolInfo.Decommission.CurrentSize = currentSize
|
||||
} else {
|
||||
poolInfo.Decommission = &PoolDecommissionInfo{
|
||||
CurrentSize: currentSize,
|
||||
TotalSize: TotalUsableCapacity(info),
|
||||
}
|
||||
}
|
||||
return poolInfo, nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) ReloadPoolMeta(ctx context.Context) (err error) {
|
||||
meta := poolMeta{}
|
||||
|
||||
if _, err = meta.load(ctx, z.serverPools[0], z.serverPools); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
z.poolMetaMutex.Lock()
|
||||
defer z.poolMetaMutex.Unlock()
|
||||
|
||||
z.poolMeta = meta
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) DecommissionCancel(ctx context.Context, idx int) (err error) {
|
||||
if idx < 0 {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
if z.SinglePool() {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
z.poolMetaMutex.Lock()
|
||||
defer z.poolMetaMutex.Unlock()
|
||||
|
||||
if z.poolMeta.DecommissionCancel(idx) {
|
||||
z.decommissionCancelers[idx]() // cancel any active thread.
|
||||
if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
|
||||
return err
|
||||
}
|
||||
globalNotificationSys.ReloadPoolMeta(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) DecommissionFailed(ctx context.Context, idx int) (err error) {
|
||||
if idx < 0 {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
if z.SinglePool() {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
z.poolMetaMutex.Lock()
|
||||
defer z.poolMetaMutex.Unlock()
|
||||
|
||||
if z.poolMeta.DecommissionFailed(idx) {
|
||||
z.decommissionCancelers[idx]() // cancel any active thread.
|
||||
if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
|
||||
return err
|
||||
}
|
||||
globalNotificationSys.ReloadPoolMeta(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) CompleteDecommission(ctx context.Context, idx int) (err error) {
|
||||
if idx < 0 {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
if z.SinglePool() {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
z.poolMetaMutex.Lock()
|
||||
defer z.poolMetaMutex.Unlock()
|
||||
|
||||
if z.poolMeta.DecommissionComplete(idx) {
|
||||
if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
|
||||
return err
|
||||
}
|
||||
globalNotificationSys.ReloadPoolMeta(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (err error) {
|
||||
if idx < 0 {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
if z.SinglePool() {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
buckets, err := z.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Support decommissioning transition tiers.
|
||||
for _, bucket := range buckets {
|
||||
if lc, err := globalLifecycleSys.Get(bucket.Name); err == nil {
|
||||
if lc.HasTransition() {
|
||||
return fmt.Errorf("Bucket is part of transitioned tier %s: decommission is not allowed in Tier'd setups", bucket.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Buckets data are dispersed in multiple zones/sets, make
|
||||
// sure to decommission the necessary metadata.
|
||||
buckets = append(buckets, BucketInfo{
|
||||
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
|
||||
}, BucketInfo{
|
||||
Name: pathJoin(minioMetaBucket, bucketMetaPrefix),
|
||||
})
|
||||
|
||||
var pool *erasureSets
|
||||
for pidx := range z.serverPools {
|
||||
if pidx == idx {
|
||||
pool = z.serverPools[idx]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if pool == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
info, _ := pool.StorageInfo(ctx)
|
||||
info.Backend.Type = madmin.Erasure
|
||||
|
||||
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||
if scParity <= 0 {
|
||||
scParity = z.serverPools[0].defaultParityCount
|
||||
}
|
||||
rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
|
||||
info.Backend.StandardSCData = append(info.Backend.StandardSCData, pool.SetDriveCount()-scParity)
|
||||
info.Backend.RRSCData = append(info.Backend.RRSCData, pool.SetDriveCount()-rrSCParity)
|
||||
info.Backend.StandardSCParity = scParity
|
||||
info.Backend.RRSCParity = rrSCParity
|
||||
|
||||
z.poolMetaMutex.Lock()
|
||||
defer z.poolMetaMutex.Unlock()
|
||||
|
||||
if err = z.poolMeta.Decommission(idx, info); err != nil {
|
||||
return err
|
||||
}
|
||||
z.poolMeta.QueueBuckets(idx, buckets)
|
||||
if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
|
||||
return err
|
||||
}
|
||||
globalNotificationSys.ReloadPoolMeta(ctx)
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user