mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
d2f5c3621f
current decommission traces were missing for - Skipped ILM expired versions - Skipped single DELETE marked version - A success or failure in decommissioning DELETE marker - allow additional info to be shared in DecomStatus() API
1381 lines
37 KiB
Go
1381 lines
37 KiB
Go
// Copyright (c) 2015-2023 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net/http"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/minio/madmin-go/v3"
|
|
"github.com/minio/minio/internal/hash"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/console"
|
|
"github.com/minio/pkg/env"
|
|
"github.com/minio/pkg/workers"
|
|
)
|
|
|
|
// PoolDecommissionInfo currently decommissioning information
|
|
type PoolDecommissionInfo struct {
|
|
StartTime time.Time `json:"startTime" msg:"st"`
|
|
StartSize int64 `json:"startSize" msg:"ss"`
|
|
TotalSize int64 `json:"totalSize" msg:"ts"`
|
|
CurrentSize int64 `json:"currentSize" msg:"cs"`
|
|
|
|
Complete bool `json:"complete" msg:"cmp"`
|
|
Failed bool `json:"failed" msg:"fl"`
|
|
Canceled bool `json:"canceled" msg:"cnl"`
|
|
|
|
// Internal information.
|
|
QueuedBuckets []string `json:"-" msg:"bkts"`
|
|
DecommissionedBuckets []string `json:"-" msg:"dbkts"`
|
|
|
|
// Last bucket/object decommissioned.
|
|
Bucket string `json:"-" msg:"bkt"`
|
|
// Captures prefix that is currently being
|
|
// decommissioned inside the 'Bucket'
|
|
Prefix string `json:"-" msg:"pfx"`
|
|
Object string `json:"-" msg:"obj"`
|
|
|
|
// Verbose information
|
|
ItemsDecommissioned int64 `json:"objectsDecommissioned" msg:"id"`
|
|
ItemsDecommissionFailed int64 `json:"objectsDecommissionedFailed" msg:"idf"`
|
|
BytesDone int64 `json:"bytesDecommissioned" msg:"bd"`
|
|
BytesFailed int64 `json:"bytesDecommissionedFailed" msg:"bf"`
|
|
}
|
|
|
|
// bucketPop should be called when a bucket is done decommissioning.
|
|
// Adds the bucket to the list of decommissioned buckets and updates resume numbers.
|
|
func (pd *PoolDecommissionInfo) bucketPop(bucket string) {
|
|
pd.DecommissionedBuckets = append(pd.DecommissionedBuckets, bucket)
|
|
for i, b := range pd.QueuedBuckets {
|
|
if b == bucket {
|
|
// Bucket is done.
|
|
pd.QueuedBuckets = append(pd.QueuedBuckets[:i], pd.QueuedBuckets[i+1:]...)
|
|
// Clear tracker info.
|
|
if pd.Bucket == bucket {
|
|
pd.Bucket = "" // empty this out for next bucket
|
|
pd.Prefix = "" // empty this out for the next bucket
|
|
pd.Object = "" // empty this out for next object
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool {
|
|
for _, b := range pd.DecommissionedBuckets {
|
|
if b == bucket {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (pd *PoolDecommissionInfo) bucketPush(bucket decomBucketInfo) {
|
|
for _, b := range pd.QueuedBuckets {
|
|
if pd.isBucketDecommissioned(b) {
|
|
return
|
|
}
|
|
if b == bucket.String() {
|
|
return
|
|
}
|
|
}
|
|
pd.QueuedBuckets = append(pd.QueuedBuckets, bucket.String())
|
|
pd.Bucket = bucket.Name
|
|
pd.Prefix = bucket.Prefix
|
|
}
|
|
|
|
// PoolStatus captures current pool status
|
|
type PoolStatus struct {
|
|
ID int `json:"id" msg:"id"`
|
|
CmdLine string `json:"cmdline" msg:"cl"`
|
|
LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
|
|
Decommission *PoolDecommissionInfo `json:"decommissionInfo,omitempty" msg:"dec"`
|
|
}
|
|
|
|
//go:generate msgp -file $GOFILE -unexported
|
|
type poolMeta struct {
|
|
Version int `msg:"v"`
|
|
Pools []PoolStatus `msg:"pls"`
|
|
}
|
|
|
|
// A decommission resumable tells us if decommission is worth
|
|
// resuming upon restart of a cluster.
|
|
func (p *poolMeta) returnResumablePools() []PoolStatus {
|
|
var newPools []PoolStatus
|
|
for _, pool := range p.Pools {
|
|
if pool.Decommission == nil {
|
|
continue
|
|
}
|
|
if pool.Decommission.Complete || pool.Decommission.Canceled {
|
|
// Do not resume decommission upon startup for
|
|
// - decommission complete
|
|
// - decommission canceled
|
|
continue
|
|
} // In all other situations we need to resume
|
|
newPools = append(newPools, pool)
|
|
}
|
|
return newPools
|
|
}
|
|
|
|
func (p *poolMeta) DecommissionComplete(idx int) bool {
|
|
if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.Complete {
|
|
p.Pools[idx].LastUpdate = UTCNow()
|
|
p.Pools[idx].Decommission.Complete = true
|
|
p.Pools[idx].Decommission.Failed = false
|
|
p.Pools[idx].Decommission.Canceled = false
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p *poolMeta) DecommissionFailed(idx int) bool {
|
|
if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.Failed {
|
|
p.Pools[idx].LastUpdate = UTCNow()
|
|
p.Pools[idx].Decommission.StartTime = time.Time{}
|
|
p.Pools[idx].Decommission.Complete = false
|
|
p.Pools[idx].Decommission.Failed = true
|
|
p.Pools[idx].Decommission.Canceled = false
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p *poolMeta) DecommissionCancel(idx int) bool {
|
|
if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.Canceled {
|
|
p.Pools[idx].LastUpdate = UTCNow()
|
|
p.Pools[idx].Decommission.StartTime = time.Time{}
|
|
p.Pools[idx].Decommission.Complete = false
|
|
p.Pools[idx].Decommission.Failed = false
|
|
p.Pools[idx].Decommission.Canceled = true
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p poolMeta) isBucketDecommissioned(idx int, bucket string) bool {
|
|
return p.Pools[idx].Decommission.isBucketDecommissioned(bucket)
|
|
}
|
|
|
|
func (p *poolMeta) BucketDone(idx int, bucket decomBucketInfo) {
|
|
if p.Pools[idx].Decommission == nil {
|
|
// Decommission not in progress.
|
|
return
|
|
}
|
|
p.Pools[idx].Decommission.bucketPop(bucket.String())
|
|
}
|
|
|
|
func (p poolMeta) ResumeBucketObject(idx int) (bucket, object string) {
|
|
if p.Pools[idx].Decommission != nil {
|
|
bucket = p.Pools[idx].Decommission.Bucket
|
|
object = p.Pools[idx].Decommission.Object
|
|
}
|
|
return
|
|
}
|
|
|
|
func (p *poolMeta) TrackCurrentBucketObject(idx int, bucket string, object string) {
|
|
if p.Pools[idx].Decommission == nil {
|
|
// Decommission not in progress.
|
|
return
|
|
}
|
|
p.Pools[idx].Decommission.Bucket = bucket
|
|
p.Pools[idx].Decommission.Object = object
|
|
}
|
|
|
|
func (p *poolMeta) PendingBuckets(idx int) []decomBucketInfo {
|
|
if p.Pools[idx].Decommission == nil {
|
|
// Decommission not in progress.
|
|
return nil
|
|
}
|
|
|
|
decomBuckets := make([]decomBucketInfo, len(p.Pools[idx].Decommission.QueuedBuckets))
|
|
for i := range decomBuckets {
|
|
bucket, prefix := path2BucketObject(p.Pools[idx].Decommission.QueuedBuckets[i])
|
|
decomBuckets[i] = decomBucketInfo{
|
|
Name: bucket,
|
|
Prefix: prefix,
|
|
}
|
|
}
|
|
|
|
return decomBuckets
|
|
}
|
|
|
|
//msgp:ignore decomBucketInfo
|
|
type decomBucketInfo struct {
|
|
Name string
|
|
Prefix string
|
|
}
|
|
|
|
func (db decomBucketInfo) String() string {
|
|
return pathJoin(db.Name, db.Prefix)
|
|
}
|
|
|
|
func (p *poolMeta) QueueBuckets(idx int, buckets []decomBucketInfo) {
|
|
// add new queued buckets
|
|
for _, bucket := range buckets {
|
|
p.Pools[idx].Decommission.bucketPush(bucket)
|
|
}
|
|
}
|
|
|
|
var (
|
|
errDecommissionAlreadyRunning = errors.New("decommission is already in progress")
|
|
errDecommissionComplete = errors.New("decommission is complete, please remove the servers from command-line")
|
|
)
|
|
|
|
func (p *poolMeta) Decommission(idx int, pi poolSpaceInfo) error {
|
|
// Return an error when there is decommission on going - the user needs
|
|
// to explicitly cancel it first in order to restart decommissioning again.
|
|
if p.Pools[idx].Decommission != nil &&
|
|
!p.Pools[idx].Decommission.Complete &&
|
|
!p.Pools[idx].Decommission.Failed &&
|
|
!p.Pools[idx].Decommission.Canceled {
|
|
return errDecommissionAlreadyRunning
|
|
}
|
|
|
|
now := UTCNow()
|
|
p.Pools[idx].LastUpdate = now
|
|
p.Pools[idx].Decommission = &PoolDecommissionInfo{
|
|
StartTime: now,
|
|
StartSize: pi.Free,
|
|
CurrentSize: pi.Free,
|
|
TotalSize: pi.Total,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p poolMeta) IsSuspended(idx int) bool {
|
|
return p.Pools[idx].Decommission != nil
|
|
}
|
|
|
|
func (p *poolMeta) validate(pools []*erasureSets) (bool, error) {
|
|
type poolInfo struct {
|
|
position int
|
|
completed bool
|
|
decomStarted bool // started but not finished yet
|
|
}
|
|
|
|
rememberedPools := make(map[string]poolInfo)
|
|
for idx, pool := range p.Pools {
|
|
complete := false
|
|
decomStarted := false
|
|
if pool.Decommission != nil {
|
|
if pool.Decommission.Complete {
|
|
complete = true
|
|
}
|
|
decomStarted = true
|
|
}
|
|
rememberedPools[pool.CmdLine] = poolInfo{
|
|
position: idx,
|
|
completed: complete,
|
|
decomStarted: decomStarted,
|
|
}
|
|
}
|
|
|
|
specifiedPools := make(map[string]int)
|
|
for idx, pool := range pools {
|
|
specifiedPools[pool.endpoints.CmdLine] = idx
|
|
}
|
|
|
|
var update bool
|
|
// Check if specified pools need to be removed from decommissioned pool.
|
|
for k := range specifiedPools {
|
|
pi, ok := rememberedPools[k]
|
|
if !ok {
|
|
// we do not have the pool anymore that we previously remembered, since all
|
|
// the CLI checks out we can allow updates since we are mostly adding a pool here.
|
|
update = true
|
|
}
|
|
if ok && pi.completed {
|
|
return false, fmt.Errorf("pool(%s) = %s is decommissioned, please remove from server command line", humanize.Ordinal(pi.position+1), k)
|
|
}
|
|
}
|
|
|
|
if len(specifiedPools) == len(rememberedPools) {
|
|
for k, pi := range rememberedPools {
|
|
pos, ok := specifiedPools[k]
|
|
if ok && pos != pi.position {
|
|
update = true // pool order is changing, its okay to allow it.
|
|
}
|
|
}
|
|
}
|
|
|
|
if !update {
|
|
update = len(specifiedPools) != len(rememberedPools)
|
|
}
|
|
|
|
return update, nil
|
|
}
|
|
|
|
func (p *poolMeta) load(ctx context.Context, pool *erasureSets, pools []*erasureSets) error {
|
|
data, err := readConfig(ctx, pool, poolMetaName)
|
|
if err != nil {
|
|
if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
if len(data) == 0 {
|
|
// Seems to be empty create a new poolMeta object.
|
|
return nil
|
|
}
|
|
if len(data) <= 4 {
|
|
return fmt.Errorf("poolMeta: no data")
|
|
}
|
|
// Read header
|
|
switch binary.LittleEndian.Uint16(data[0:2]) {
|
|
case poolMetaFormat:
|
|
default:
|
|
return fmt.Errorf("poolMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
|
|
}
|
|
switch binary.LittleEndian.Uint16(data[2:4]) {
|
|
case poolMetaVersion:
|
|
default:
|
|
return fmt.Errorf("poolMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
|
|
}
|
|
|
|
// OK, parse data.
|
|
if _, err = p.UnmarshalMsg(data[4:]); err != nil {
|
|
return err
|
|
}
|
|
|
|
switch p.Version {
|
|
case poolMetaVersionV1:
|
|
default:
|
|
return fmt.Errorf("unexpected pool meta version: %d", p.Version)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *poolMeta) CountItem(idx int, size int64, failed bool) {
|
|
pd := p.Pools[idx].Decommission
|
|
if pd != nil {
|
|
if failed {
|
|
pd.ItemsDecommissionFailed++
|
|
pd.BytesFailed += size
|
|
} else {
|
|
pd.ItemsDecommissioned++
|
|
pd.BytesDone += size
|
|
}
|
|
p.Pools[idx].Decommission = pd
|
|
}
|
|
}
|
|
|
|
func (p *poolMeta) updateAfter(ctx context.Context, idx int, pools []*erasureSets, duration time.Duration) (bool, error) {
|
|
if p.Pools[idx].Decommission == nil {
|
|
return false, errInvalidArgument
|
|
}
|
|
now := UTCNow()
|
|
if now.Sub(p.Pools[idx].LastUpdate) >= duration {
|
|
if serverDebugLog {
|
|
console.Debugf("decommission: persisting poolMeta on drive: threshold:%s, poolMeta:%#v\n", now.Sub(p.Pools[idx].LastUpdate), p.Pools[idx])
|
|
}
|
|
p.Pools[idx].LastUpdate = now
|
|
if err := p.save(ctx, pools); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func (p poolMeta) save(ctx context.Context, pools []*erasureSets) error {
|
|
data := make([]byte, 4, p.Msgsize()+4)
|
|
|
|
// Initialize the header.
|
|
binary.LittleEndian.PutUint16(data[0:2], poolMetaFormat)
|
|
binary.LittleEndian.PutUint16(data[2:4], poolMetaVersion)
|
|
|
|
buf, err := p.MarshalMsg(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Saves on all pools to make sure decommissioning of first pool is allowed.
|
|
for i, eset := range pools {
|
|
if err = saveConfig(ctx, eset, poolMetaName, buf); err != nil {
|
|
if !errors.Is(err, context.Canceled) {
|
|
logger.LogIf(ctx, fmt.Errorf("saving pool.bin for pool index %d failed with: %v", i, err))
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
poolMetaName = "pool.bin"
|
|
poolMetaFormat = 1
|
|
poolMetaVersionV1 = 1
|
|
poolMetaVersion = poolMetaVersionV1
|
|
)
|
|
|
|
// Init() initializes pools and saves additional information about them
|
|
// in 'pool.bin', this is eventually used for decommissioning the pool.
|
|
func (z *erasureServerPools) Init(ctx context.Context) error {
|
|
// Load rebalance metadata if present
|
|
err := z.loadRebalanceMeta(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load rebalance data: %w", err)
|
|
}
|
|
|
|
// Start rebalance routine
|
|
z.StartRebalance()
|
|
|
|
meta := poolMeta{}
|
|
if err := meta.load(ctx, z.serverPools[0], z.serverPools); err != nil {
|
|
return err
|
|
}
|
|
|
|
update, err := meta.validate(z.serverPools)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// if no update is needed return right away.
|
|
if !update {
|
|
z.poolMeta = meta
|
|
} else {
|
|
newMeta := poolMeta{} // to update write poolMeta fresh.
|
|
// looks like new pool was added we need to update,
|
|
// or this is a fresh installation (or an existing
|
|
// installation with pool removed)
|
|
newMeta.Version = poolMetaVersion
|
|
for idx, pool := range z.serverPools {
|
|
var skip bool
|
|
for _, currentPool := range meta.Pools {
|
|
// Preserve any current pool status.
|
|
if currentPool.CmdLine == pool.endpoints.CmdLine {
|
|
newMeta.Pools = append(newMeta.Pools, currentPool)
|
|
skip = true
|
|
break
|
|
}
|
|
}
|
|
if skip {
|
|
continue
|
|
}
|
|
newMeta.Pools = append(newMeta.Pools, PoolStatus{
|
|
CmdLine: pool.endpoints.CmdLine,
|
|
ID: idx,
|
|
LastUpdate: UTCNow(),
|
|
})
|
|
}
|
|
if err = newMeta.save(ctx, z.serverPools); err != nil {
|
|
return err
|
|
}
|
|
z.poolMeta = newMeta
|
|
}
|
|
|
|
pools := meta.returnResumablePools()
|
|
poolIndices := make([]int, 0, len(pools))
|
|
for _, pool := range pools {
|
|
idx := globalEndpoints.GetPoolIdx(pool.CmdLine)
|
|
if idx == -1 {
|
|
return fmt.Errorf("unexpected state present for decommission status pool(%s) not found", pool.CmdLine)
|
|
}
|
|
poolIndices = append(poolIndices, idx)
|
|
}
|
|
|
|
if len(poolIndices) > 0 && globalEndpoints[poolIndices[0]].Endpoints[0].IsLocal {
|
|
go func() {
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
for {
|
|
if err := z.Decommission(ctx, poolIndices...); err != nil {
|
|
if errors.Is(err, errDecommissionAlreadyRunning) {
|
|
// A previous decommission running found restart it.
|
|
for _, idx := range poolIndices {
|
|
z.doDecommissionInRoutine(ctx, idx)
|
|
}
|
|
return
|
|
}
|
|
if configRetriableErrors(err) {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pools %v: %w: retrying..", pools, err))
|
|
time.Sleep(time.Second + time.Duration(r.Float64()*float64(5*time.Second)))
|
|
continue
|
|
}
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pools, err))
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (z *erasureServerPools) IsDecommissionRunning() bool {
|
|
z.poolMetaMutex.RLock()
|
|
defer z.poolMetaMutex.RUnlock()
|
|
meta := z.poolMeta
|
|
for _, pool := range meta.Pools {
|
|
if pool.Decommission != nil &&
|
|
!pool.Decommission.Complete &&
|
|
!pool.Decommission.Failed &&
|
|
!pool.Decommission.Canceled {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
|
|
objInfo := gr.ObjInfo
|
|
|
|
defer func() {
|
|
gr.Close()
|
|
auditLogDecom(ctx, "DecomCopyData", objInfo.Bucket, objInfo.Name, objInfo.VersionID, err)
|
|
}()
|
|
|
|
actualSize, err := objInfo.GetActualSize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if objInfo.isMultipart() {
|
|
res, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
UserDefined: objInfo.UserDefined,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err)
|
|
}
|
|
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, ObjectOptions{})
|
|
parts := make([]CompletePart, len(objInfo.Parts))
|
|
for i, part := range objInfo.Parts {
|
|
hr, err := hash.NewReader(io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
|
|
if err != nil {
|
|
return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
|
|
}
|
|
pi, err := z.PutObjectPart(ctx, bucket, objInfo.Name, res.UploadID,
|
|
part.Number,
|
|
NewPutObjReader(hr),
|
|
ObjectOptions{
|
|
PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
|
|
IndexCB: func() []byte {
|
|
return part.Index // Preserve part Index to ensure decompression works.
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("decommissionObject: PutObjectPart() %w", err)
|
|
}
|
|
parts[i] = CompletePart{
|
|
ETag: pi.ETag,
|
|
PartNumber: pi.PartNumber,
|
|
ChecksumCRC32: pi.ChecksumCRC32,
|
|
ChecksumCRC32C: pi.ChecksumCRC32C,
|
|
ChecksumSHA256: pi.ChecksumSHA256,
|
|
ChecksumSHA1: pi.ChecksumSHA1,
|
|
}
|
|
}
|
|
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{
|
|
MTime: objInfo.ModTime,
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("decommissionObject: CompleteMultipartUpload() %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
hr, err := hash.NewReader(io.LimitReader(gr, objInfo.Size), objInfo.Size, "", "", actualSize)
|
|
if err != nil {
|
|
return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
|
|
}
|
|
_, err = z.PutObject(ctx,
|
|
bucket,
|
|
objInfo.Name,
|
|
NewPutObjReader(hr),
|
|
ObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
MTime: objInfo.ModTime,
|
|
UserDefined: objInfo.UserDefined,
|
|
PreserveETag: objInfo.ETag, // Preserve original ETag to ensure same metadata.
|
|
IndexCB: func() []byte {
|
|
return objInfo.Parts[0].Index // Preserve part Index to ensure decompression works.
|
|
},
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("decommissionObject: PutObject() %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// versionsSorter sorts FileInfo slices by version.
|
|
//
|
|
//msgp:ignore versionsSorter
|
|
type versionsSorter []FileInfo
|
|
|
|
func (v versionsSorter) reverse() {
|
|
sort.Slice(v, func(i, j int) bool {
|
|
return v[i].ModTime.Before(v[j].ModTime)
|
|
})
|
|
}
|
|
|
|
func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi decomBucketInfo, fn func(entry metaCacheEntry)) error {
|
|
disks := set.getOnlineDisks()
|
|
if len(disks) == 0 {
|
|
return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints())
|
|
}
|
|
|
|
// How to resolve partial results.
|
|
resolver := metadataResolutionParams{
|
|
dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
|
objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
|
bucket: bi.Name,
|
|
}
|
|
|
|
err := listPathRaw(ctx, listPathRawOptions{
|
|
disks: disks,
|
|
bucket: bi.Name,
|
|
path: bi.Prefix,
|
|
recursive: true,
|
|
forwardTo: "",
|
|
minDisks: len(disks) / 2, // to capture all quorum ratios
|
|
reportNotFound: false,
|
|
agreed: fn,
|
|
partial: func(entries metaCacheEntries, _ []error) {
|
|
entry, ok := entries.resolve(&resolver)
|
|
if ok {
|
|
fn(*entry)
|
|
}
|
|
},
|
|
finished: nil,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error {
|
|
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
|
|
|
wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets)))
|
|
workerSize, err := strconv.Atoi(wStr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// each set get its own thread separate from the concurrent
|
|
// objects/versions being decommissioned.
|
|
workerSize += len(pool.sets)
|
|
|
|
wk, err := workers.New(workerSize)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
vc, _ := globalBucketVersioningSys.Get(bi.Name)
|
|
|
|
// Check if the current bucket has a configured lifecycle policy
|
|
lc, _ := globalLifecycleSys.Get(bi.Name)
|
|
|
|
// Check if bucket is object locked.
|
|
lr, _ := globalBucketObjectLockSys.Get(bi.Name)
|
|
|
|
for _, set := range pool.sets {
|
|
set := set
|
|
|
|
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
|
|
if lc == nil {
|
|
return false
|
|
}
|
|
versioned := vc != nil && vc.Versioned(object)
|
|
objInfo := fi.ToObjectInfo(bucket, object, versioned)
|
|
|
|
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
|
|
switch {
|
|
case evt.Action.DeleteRestored(): // if restored copy has expired,delete it synchronously
|
|
applyExpiryOnTransitionedObject(ctx, z, objInfo, evt, lcEventSrc_Decom)
|
|
return false
|
|
case evt.Action.Delete():
|
|
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_Decom)
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
decommissionEntry := func(entry metaCacheEntry) {
|
|
defer wk.Give()
|
|
|
|
if entry.isDir() {
|
|
return
|
|
}
|
|
|
|
fivs, err := entry.fileInfoVersions(bi.Name)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// We need a reversed order for decommissioning,
|
|
// to create the appropriate stack.
|
|
versionsSorter(fivs.Versions).reverse()
|
|
|
|
var decommissioned, expired int
|
|
for _, version := range fivs.Versions {
|
|
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionObject, idx, bi.Name, version.Name, version.VersionID)
|
|
// Apply lifecycle rules on the objects that are expired.
|
|
if filterLifecycle(bi.Name, version.Name, version) {
|
|
expired++
|
|
decommissioned++
|
|
stopFn(errors.New("ILM expired object/version will be skipped"))
|
|
continue
|
|
}
|
|
|
|
// any object with only single DEL marker we don't need
|
|
// to decommission, just skip it, this also includes
|
|
// any other versions that have already expired.
|
|
remainingVersions := len(fivs.Versions) - expired
|
|
if version.Deleted && remainingVersions == 1 {
|
|
decommissioned++
|
|
stopFn(errors.New("DELETE marked object with no other non-current versions will be skipped"))
|
|
continue
|
|
}
|
|
|
|
versionID := version.VersionID
|
|
if versionID == "" {
|
|
versionID = nullVersionID
|
|
}
|
|
|
|
if version.Deleted {
|
|
_, err := z.DeleteObject(ctx,
|
|
bi.Name,
|
|
version.Name,
|
|
ObjectOptions{
|
|
// Since we are preserving a delete marker, we have to make sure this is always true.
|
|
// regardless of the current configuration of the bucket we must preserve all versions
|
|
// on the pool being decommissioned.
|
|
Versioned: true,
|
|
VersionID: versionID,
|
|
MTime: version.ModTime,
|
|
DeleteReplication: version.ReplicationState,
|
|
DeleteMarker: true, // make sure we create a delete marker
|
|
SkipDecommissioned: true, // make sure we skip the decommissioned pool
|
|
})
|
|
var failure bool
|
|
if err != nil {
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
err = nil
|
|
}
|
|
}
|
|
stopFn(err)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
failure = true
|
|
}
|
|
z.poolMetaMutex.Lock()
|
|
z.poolMeta.CountItem(idx, 0, failure)
|
|
z.poolMetaMutex.Unlock()
|
|
if !failure {
|
|
// Success keep a count.
|
|
decommissioned++
|
|
}
|
|
continue
|
|
}
|
|
|
|
var failure, ignore bool
|
|
// gr.Close() is ensured by decommissionObject().
|
|
for try := 0; try < 3; try++ {
|
|
if version.IsRemote() {
|
|
if err := z.DecomTieredObject(ctx, bi.Name, version.Name, version, ObjectOptions{
|
|
VersionID: versionID,
|
|
MTime: version.ModTime,
|
|
UserDefined: version.Metadata,
|
|
}); err != nil {
|
|
stopFn(err)
|
|
failure = true
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
stopFn(nil)
|
|
failure = false
|
|
break
|
|
}
|
|
gr, err := set.GetObjectNInfo(ctx,
|
|
bi.Name,
|
|
encodeDirObject(version.Name),
|
|
nil,
|
|
http.Header{},
|
|
ObjectOptions{
|
|
VersionID: versionID,
|
|
NoDecryption: true,
|
|
NoLock: true,
|
|
})
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
// object deleted by the application, nothing to do here we move on.
|
|
ignore = true
|
|
stopFn(nil)
|
|
break
|
|
}
|
|
if err != nil && !ignore {
|
|
// if usage-cache.bin is not readable log and ignore it.
|
|
if bi.Name == minioMetaBucket && strings.Contains(version.Name, dataUsageCacheName) {
|
|
ignore = true
|
|
stopFn(err)
|
|
logger.LogIf(ctx, err)
|
|
break
|
|
}
|
|
}
|
|
if err != nil {
|
|
failure = true
|
|
logger.LogIf(ctx, err)
|
|
stopFn(err)
|
|
continue
|
|
}
|
|
if err = z.decommissionObject(ctx, bi.Name, gr); err != nil {
|
|
stopFn(err)
|
|
failure = true
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
stopFn(nil)
|
|
failure = false
|
|
break
|
|
}
|
|
if ignore {
|
|
continue
|
|
}
|
|
z.poolMetaMutex.Lock()
|
|
z.poolMeta.CountItem(idx, version.Size, failure)
|
|
z.poolMetaMutex.Unlock()
|
|
if failure {
|
|
break // break out on first error
|
|
}
|
|
decommissioned++
|
|
}
|
|
|
|
// if all versions were decommissioned, then we can delete the object versions.
|
|
if decommissioned == len(fivs.Versions) {
|
|
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionRemoveObject, idx, bi.Name, entry.name)
|
|
_, err := set.DeleteObject(ctx,
|
|
bi.Name,
|
|
encodeDirObject(entry.name),
|
|
ObjectOptions{
|
|
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
|
},
|
|
)
|
|
stopFn(err)
|
|
auditLogDecom(ctx, "DecomDeleteObject", bi.Name, entry.name, "", err)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
z.poolMetaMutex.Lock()
|
|
z.poolMeta.TrackCurrentBucketObject(idx, bi.Name, entry.name)
|
|
ok, err := z.poolMeta.updateAfter(ctx, idx, z.serverPools, 30*time.Second)
|
|
logger.LogIf(ctx, err)
|
|
if ok {
|
|
globalNotificationSys.ReloadPoolMeta(ctx)
|
|
}
|
|
z.poolMetaMutex.Unlock()
|
|
}
|
|
|
|
wk.Take()
|
|
go func() {
|
|
defer wk.Give()
|
|
err := set.listObjectsToDecommission(ctx, bi,
|
|
func(entry metaCacheEntry) {
|
|
wk.Take()
|
|
go decommissionEntry(entry)
|
|
},
|
|
)
|
|
logger.LogIf(ctx, err)
|
|
}()
|
|
}
|
|
wk.Wait()
|
|
return nil
|
|
}
|
|
|
|
//msgp:ignore decomMetrics
|
|
type decomMetrics struct{}
|
|
|
|
var globalDecommissionMetrics decomMetrics
|
|
|
|
//msgp:ignore decomMetric
|
|
//go:generate stringer -type=decomMetric -trimprefix=decomMetric $GOFILE
|
|
type decomMetric uint8
|
|
|
|
const (
|
|
decomMetricDecommissionBucket decomMetric = iota
|
|
decomMetricDecommissionObject
|
|
decomMetricDecommissionRemoveObject
|
|
)
|
|
|
|
func decomTrace(d decomMetric, poolIdx int, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo {
|
|
var errStr string
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
return madmin.TraceInfo{
|
|
TraceType: madmin.TraceDecommission,
|
|
Time: startTime,
|
|
NodeName: globalLocalNodeName,
|
|
FuncName: fmt.Sprintf("decommission.%s (pool-id=%d)", d.String(), poolIdx),
|
|
Duration: duration,
|
|
Path: path,
|
|
Error: errStr,
|
|
}
|
|
}
|
|
|
|
func (m *decomMetrics) log(d decomMetric, poolIdx int, paths ...string) func(err error) {
|
|
startTime := time.Now()
|
|
return func(err error) {
|
|
duration := time.Since(startTime)
|
|
if globalTrace.NumSubscribers(madmin.TraceDecommission) > 0 {
|
|
globalTrace.Publish(decomTrace(d, poolIdx, startTime, duration, strings.Join(paths, " "), err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx int) error {
|
|
pool := z.serverPools[idx]
|
|
for _, bucket := range z.poolMeta.PendingBuckets(idx) {
|
|
if z.poolMeta.isBucketDecommissioned(idx, bucket.String()) {
|
|
if serverDebugLog {
|
|
console.Debugln("decommission: already done, moving on", bucket)
|
|
}
|
|
|
|
z.poolMetaMutex.Lock()
|
|
z.poolMeta.BucketDone(idx, bucket) // remove from pendingBuckets and persist.
|
|
z.poolMeta.save(ctx, z.serverPools)
|
|
z.poolMetaMutex.Unlock()
|
|
continue
|
|
}
|
|
if serverDebugLog {
|
|
console.Debugln("decommission: currently on bucket", bucket.Name)
|
|
}
|
|
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionBucket, idx, bucket.Name)
|
|
if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil {
|
|
stopFn(err)
|
|
return err
|
|
}
|
|
stopFn(nil)
|
|
|
|
z.poolMetaMutex.Lock()
|
|
z.poolMeta.BucketDone(idx, bucket)
|
|
z.poolMeta.save(ctx, z.serverPools)
|
|
z.poolMetaMutex.Unlock()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (z *erasureServerPools) checkAfterDecom(ctx context.Context, idx int) error {
|
|
buckets, err := z.getBucketsToDecommission(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pool := z.serverPools[idx]
|
|
for _, set := range pool.sets {
|
|
for _, bi := range buckets {
|
|
vc, _ := globalBucketVersioningSys.Get(bi.Name)
|
|
|
|
// Check if the current bucket has a configured lifecycle policy
|
|
lc, _ := globalLifecycleSys.Get(bi.Name)
|
|
|
|
// Check if bucket is object locked.
|
|
lr, _ := globalBucketObjectLockSys.Get(bi.Name)
|
|
|
|
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
|
|
if lc == nil {
|
|
return false
|
|
}
|
|
versioned := vc != nil && vc.Versioned(object)
|
|
objInfo := fi.ToObjectInfo(bucket, object, versioned)
|
|
|
|
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
|
|
switch {
|
|
case evt.Action.DeleteRestored(): // if restored copy has expired,delete it synchronously
|
|
applyExpiryOnTransitionedObject(ctx, z, objInfo, evt, lcEventSrc_Decom)
|
|
return false
|
|
case evt.Action.Delete():
|
|
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_Decom)
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
var versionsFound int
|
|
err := set.listObjectsToDecommission(ctx, bi, func(entry metaCacheEntry) {
|
|
if !entry.isObject() {
|
|
return
|
|
}
|
|
|
|
fivs, err := entry.fileInfoVersions(bi.Name)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// We need a reversed order for decommissioning,
|
|
// to create the appropriate stack.
|
|
versionsSorter(fivs.Versions).reverse()
|
|
|
|
for _, version := range fivs.Versions {
|
|
// Apply lifecycle rules on the objects that are expired.
|
|
if filterLifecycle(bi.Name, version.Name, version) {
|
|
continue
|
|
}
|
|
|
|
// `.usage-cache.bin` still exists, must be not readable ignore it.
|
|
if bi.Name == minioMetaBucket && strings.Contains(version.Name, dataUsageCacheName) {
|
|
// skipping bucket usage cache name, as its autogenerated.
|
|
continue
|
|
}
|
|
|
|
versionsFound++
|
|
}
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if versionsFound > 0 {
|
|
return fmt.Errorf("at least %d object(s)/version(s) were found in bucket `%s` after decommissioning", versionsFound, bi.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx int) {
|
|
z.poolMetaMutex.Lock()
|
|
var dctx context.Context
|
|
dctx, z.decommissionCancelers[idx] = context.WithCancel(GlobalContext)
|
|
z.poolMetaMutex.Unlock()
|
|
|
|
// Generate an empty request info so it can be directly modified later by audit
|
|
dctx = logger.SetReqInfo(dctx, &logger.ReqInfo{})
|
|
|
|
if err := z.decommissionInBackground(dctx, idx); err != nil {
|
|
logger.LogIf(GlobalContext, err)
|
|
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
|
|
return
|
|
}
|
|
|
|
z.poolMetaMutex.Lock()
|
|
failed := z.poolMeta.Pools[idx].Decommission.ItemsDecommissionFailed > 0 || contextCanceled(dctx)
|
|
poolCmdLine := z.poolMeta.Pools[idx].CmdLine
|
|
z.poolMetaMutex.Unlock()
|
|
|
|
if !failed {
|
|
logger.Info("Decommissioning complete for pool '%s', verifying for any pending objects", poolCmdLine)
|
|
err := z.checkAfterDecom(dctx, idx)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
failed = true
|
|
}
|
|
}
|
|
|
|
if failed {
|
|
// Decommission failed indicate as such.
|
|
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
|
|
} else {
|
|
// Complete the decommission..
|
|
logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx))
|
|
}
|
|
}
|
|
|
|
func (z *erasureServerPools) IsSuspended(idx int) bool {
|
|
z.poolMetaMutex.RLock()
|
|
defer z.poolMetaMutex.RUnlock()
|
|
return z.poolMeta.IsSuspended(idx)
|
|
}
|
|
|
|
// Decommission - start decommission session.
|
|
func (z *erasureServerPools) Decommission(ctx context.Context, indices ...int) error {
|
|
if len(indices) == 0 {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
if z.SinglePool() {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
// Make pool unwritable before decommissioning.
|
|
if err := z.StartDecommission(ctx, indices...); err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
for _, idx := range indices {
|
|
// decommission all pools serially one after
|
|
// the other.
|
|
z.doDecommissionInRoutine(ctx, idx)
|
|
}
|
|
}()
|
|
|
|
// Successfully started decommissioning.
|
|
return nil
|
|
}
|
|
|
|
type decomError struct {
|
|
Err string
|
|
}
|
|
|
|
func (d decomError) Error() string {
|
|
return d.Err
|
|
}
|
|
|
|
type poolSpaceInfo struct {
|
|
Free int64
|
|
Total int64
|
|
Used int64
|
|
}
|
|
|
|
func (z *erasureServerPools) getDecommissionPoolSpaceInfo(idx int) (pi poolSpaceInfo, err error) {
|
|
if idx < 0 {
|
|
return pi, errInvalidArgument
|
|
}
|
|
if idx+1 > len(z.serverPools) {
|
|
return pi, errInvalidArgument
|
|
}
|
|
|
|
info := z.serverPools[idx].StorageInfo(context.Background())
|
|
info.Backend = z.BackendInfo()
|
|
|
|
usableTotal := int64(GetTotalUsableCapacity(info.Disks, info))
|
|
usableFree := int64(GetTotalUsableCapacityFree(info.Disks, info))
|
|
return poolSpaceInfo{
|
|
Total: usableTotal,
|
|
Free: usableFree,
|
|
Used: usableTotal - usableFree,
|
|
}, nil
|
|
}
|
|
|
|
func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, error) {
|
|
if idx < 0 {
|
|
return PoolStatus{}, errInvalidArgument
|
|
}
|
|
|
|
z.poolMetaMutex.RLock()
|
|
defer z.poolMetaMutex.RUnlock()
|
|
|
|
pi, err := z.getDecommissionPoolSpaceInfo(idx)
|
|
if err != nil {
|
|
return PoolStatus{}, err
|
|
}
|
|
|
|
poolInfo := z.poolMeta.Pools[idx]
|
|
if poolInfo.Decommission != nil {
|
|
poolInfo.Decommission.TotalSize = pi.Total
|
|
poolInfo.Decommission.CurrentSize = poolInfo.Decommission.StartSize + poolInfo.Decommission.BytesDone
|
|
} else {
|
|
poolInfo.Decommission = &PoolDecommissionInfo{
|
|
TotalSize: pi.Total,
|
|
CurrentSize: pi.Free,
|
|
}
|
|
}
|
|
return poolInfo, nil
|
|
}
|
|
|
|
func (z *erasureServerPools) ReloadPoolMeta(ctx context.Context) (err error) {
|
|
meta := poolMeta{}
|
|
|
|
if err = meta.load(ctx, z.serverPools[0], z.serverPools); err != nil {
|
|
return err
|
|
}
|
|
|
|
z.poolMetaMutex.Lock()
|
|
defer z.poolMetaMutex.Unlock()
|
|
|
|
z.poolMeta = meta
|
|
return nil
|
|
}
|
|
|
|
func (z *erasureServerPools) DecommissionCancel(ctx context.Context, idx int) (err error) {
|
|
if idx < 0 {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
if z.SinglePool() {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
z.poolMetaMutex.Lock()
|
|
defer z.poolMetaMutex.Unlock()
|
|
|
|
if z.poolMeta.DecommissionCancel(idx) {
|
|
if fn := z.decommissionCancelers[idx]; fn != nil {
|
|
defer fn() // cancel any active thread.
|
|
}
|
|
if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
|
|
return err
|
|
}
|
|
globalNotificationSys.ReloadPoolMeta(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (z *erasureServerPools) DecommissionFailed(ctx context.Context, idx int) (err error) {
|
|
if idx < 0 {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
if z.SinglePool() {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
z.poolMetaMutex.Lock()
|
|
defer z.poolMetaMutex.Unlock()
|
|
|
|
if z.poolMeta.DecommissionFailed(idx) {
|
|
if fn := z.decommissionCancelers[idx]; fn != nil {
|
|
defer fn() // cancel any active thread.
|
|
}
|
|
if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
|
|
return err
|
|
}
|
|
globalNotificationSys.ReloadPoolMeta(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (z *erasureServerPools) CompleteDecommission(ctx context.Context, idx int) (err error) {
|
|
if idx < 0 {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
if z.SinglePool() {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
z.poolMetaMutex.Lock()
|
|
defer z.poolMetaMutex.Unlock()
|
|
|
|
if z.poolMeta.DecommissionComplete(idx) {
|
|
if fn := z.decommissionCancelers[idx]; fn != nil {
|
|
defer fn() // cancel any active thread.
|
|
}
|
|
if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
|
|
return err
|
|
}
|
|
globalNotificationSys.ReloadPoolMeta(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (z *erasureServerPools) getBucketsToDecommission(ctx context.Context) ([]decomBucketInfo, error) {
|
|
buckets, err := z.ListBuckets(ctx, BucketOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
decomBuckets := make([]decomBucketInfo, len(buckets))
|
|
for i := range buckets {
|
|
decomBuckets[i] = decomBucketInfo{
|
|
Name: buckets[i].Name,
|
|
}
|
|
}
|
|
|
|
// Buckets data are dispersed in multiple zones/sets, make
|
|
// sure to decommission the necessary metadata.
|
|
decomBuckets = append(decomBuckets, decomBucketInfo{
|
|
Name: minioMetaBucket,
|
|
Prefix: minioConfigPrefix,
|
|
})
|
|
decomBuckets = append(decomBuckets, decomBucketInfo{
|
|
Name: minioMetaBucket,
|
|
Prefix: bucketMetaPrefix,
|
|
})
|
|
|
|
return decomBuckets, nil
|
|
}
|
|
|
|
func (z *erasureServerPools) StartDecommission(ctx context.Context, indices ...int) (err error) {
|
|
if len(indices) == 0 {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
if z.SinglePool() {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
decomBuckets, err := z.getBucketsToDecommission(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, bucket := range decomBuckets {
|
|
z.HealBucket(ctx, bucket.Name, madmin.HealOpts{})
|
|
}
|
|
|
|
// Create .minio.sys/config, .minio.sys/buckets paths if missing,
|
|
// this code is present to avoid any missing meta buckets on other
|
|
// pools.
|
|
for _, metaBucket := range []string{
|
|
pathJoin(minioMetaBucket, minioConfigPrefix),
|
|
pathJoin(minioMetaBucket, bucketMetaPrefix),
|
|
} {
|
|
var bucketExists BucketExists
|
|
if err = z.MakeBucket(ctx, metaBucket, MakeBucketOptions{}); err != nil {
|
|
if !errors.As(err, &bucketExists) {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
z.poolMetaMutex.Lock()
|
|
defer z.poolMetaMutex.Unlock()
|
|
|
|
for _, idx := range indices {
|
|
pi, err := z.getDecommissionPoolSpaceInfo(idx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = z.poolMeta.Decommission(idx, pi); err != nil {
|
|
return err
|
|
}
|
|
|
|
z.poolMeta.QueueBuckets(idx, decomBuckets)
|
|
}
|
|
|
|
if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
|
|
return err
|
|
}
|
|
|
|
globalNotificationSys.ReloadPoolMeta(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
func auditLogDecom(ctx context.Context, apiName, bucket, object, versionID string, err error) {
|
|
errStr := ""
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
auditLogInternal(ctx, AuditLogOptions{
|
|
Event: "decommission",
|
|
APIName: apiName,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
VersionID: versionID,
|
|
Error: errStr,
|
|
})
|
|
}
|