2024-09-06 06:20:19 -07:00
// Copyright (c) 2015-2024 MinIO, Inc.
2022-01-10 09:07:49 -08:00
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"encoding/binary"
"errors"
"fmt"
2023-05-16 13:14:37 -07:00
"io"
2022-07-07 12:31:44 -07:00
"math/rand"
2022-01-10 09:07:49 -08:00
"net/http"
"sort"
2022-04-28 16:27:53 -07:00
"strings"
2022-01-10 09:07:49 -08:00
"time"
"github.com/dustin/go-humanize"
2023-06-19 17:53:08 -07:00
"github.com/minio/madmin-go/v3"
2024-04-22 10:49:30 -07:00
"github.com/minio/minio/internal/bucket/lifecycle"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/bucket/versioning"
2022-01-10 09:07:49 -08:00
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
2024-05-24 16:05:23 -07:00
"github.com/minio/pkg/v3/console"
"github.com/minio/pkg/v3/env"
"github.com/minio/pkg/v3/workers"
2022-01-10 09:07:49 -08:00
)
// PoolDecommissionInfo currently decommissioning information
type PoolDecommissionInfo struct {
StartTime time . Time ` json:"startTime" msg:"st" `
StartSize int64 ` json:"startSize" msg:"ss" `
TotalSize int64 ` json:"totalSize" msg:"ts" `
CurrentSize int64 ` json:"currentSize" msg:"cs" `
Complete bool ` json:"complete" msg:"cmp" `
Failed bool ` json:"failed" msg:"fl" `
Canceled bool ` json:"canceled" msg:"cnl" `
// Internal information.
QueuedBuckets [ ] string ` json:"-" msg:"bkts" `
DecommissionedBuckets [ ] string ` json:"-" msg:"dbkts" `
// Last bucket/object decommissioned.
Bucket string ` json:"-" msg:"bkt" `
2022-07-04 14:02:54 -07:00
// Captures prefix that is currently being
// decommissioned inside the 'Bucket'
Prefix string ` json:"-" msg:"pfx" `
2022-01-10 09:07:49 -08:00
Object string ` json:"-" msg:"obj" `
// Verbose information
2023-06-27 11:59:40 -07:00
ItemsDecommissioned int64 ` json:"objectsDecommissioned" msg:"id" `
ItemsDecommissionFailed int64 ` json:"objectsDecommissionedFailed" msg:"idf" `
BytesDone int64 ` json:"bytesDecommissioned" msg:"bd" `
BytesFailed int64 ` json:"bytesDecommissionedFailed" msg:"bf" `
2022-01-10 09:07:49 -08:00
}
2023-09-16 02:28:06 -07:00
// Clone make a copy of PoolDecommissionInfo
func ( pd * PoolDecommissionInfo ) Clone ( ) * PoolDecommissionInfo {
if pd == nil {
return nil
}
return & PoolDecommissionInfo {
StartTime : pd . StartTime ,
StartSize : pd . StartSize ,
TotalSize : pd . TotalSize ,
CurrentSize : pd . CurrentSize ,
Complete : pd . Complete ,
Failed : pd . Failed ,
Canceled : pd . Canceled ,
QueuedBuckets : pd . QueuedBuckets ,
DecommissionedBuckets : pd . DecommissionedBuckets ,
Bucket : pd . Bucket ,
Prefix : pd . Prefix ,
Object : pd . Object ,
ItemsDecommissioned : pd . ItemsDecommissioned ,
ItemsDecommissionFailed : pd . ItemsDecommissionFailed ,
BytesDone : pd . BytesDone ,
BytesFailed : pd . BytesFailed ,
}
}
2022-01-10 09:07:49 -08:00
// bucketPop should be called when a bucket is done decommissioning.
// Adds the bucket to the list of decommissioned buckets and updates resume numbers.
2024-03-18 15:25:45 -07:00
func ( pd * PoolDecommissionInfo ) bucketPop ( bucket string ) bool {
2022-01-10 09:07:49 -08:00
pd . DecommissionedBuckets = append ( pd . DecommissionedBuckets , bucket )
for i , b := range pd . QueuedBuckets {
if b == bucket {
// Bucket is done.
pd . QueuedBuckets = append ( pd . QueuedBuckets [ : i ] , pd . QueuedBuckets [ i + 1 : ] ... )
2022-01-10 17:26:00 -08:00
// Clear tracker info.
if pd . Bucket == bucket {
pd . Bucket = "" // empty this out for next bucket
2022-07-04 14:02:54 -07:00
pd . Prefix = "" // empty this out for the next bucket
2022-01-10 17:26:00 -08:00
pd . Object = "" // empty this out for next object
}
2024-03-18 15:25:45 -07:00
return true
2022-01-10 09:07:49 -08:00
}
}
2024-03-18 15:25:45 -07:00
return false
2022-01-10 09:07:49 -08:00
}
func ( pd * PoolDecommissionInfo ) isBucketDecommissioned ( bucket string ) bool {
for _ , b := range pd . DecommissionedBuckets {
if b == bucket {
return true
}
}
return false
}
2022-07-04 14:02:54 -07:00
func ( pd * PoolDecommissionInfo ) bucketPush ( bucket decomBucketInfo ) {
2022-01-10 09:07:49 -08:00
for _ , b := range pd . QueuedBuckets {
if pd . isBucketDecommissioned ( b ) {
return
}
2022-07-04 14:02:54 -07:00
if b == bucket . String ( ) {
2022-01-10 09:07:49 -08:00
return
}
}
2022-07-04 14:02:54 -07:00
pd . QueuedBuckets = append ( pd . QueuedBuckets , bucket . String ( ) )
pd . Bucket = bucket . Name
pd . Prefix = bucket . Prefix
2022-01-10 09:07:49 -08:00
}
// PoolStatus captures current pool status
type PoolStatus struct {
ID int ` json:"id" msg:"id" `
CmdLine string ` json:"cmdline" msg:"cl" `
LastUpdate time . Time ` json:"lastUpdate" msg:"lu" `
Decommission * PoolDecommissionInfo ` json:"decommissionInfo,omitempty" msg:"dec" `
}
2023-09-16 02:28:06 -07:00
// Clone returns a copy of PoolStatus
func ( ps PoolStatus ) Clone ( ) PoolStatus {
return PoolStatus {
ID : ps . ID ,
CmdLine : ps . CmdLine ,
LastUpdate : ps . LastUpdate ,
Decommission : ps . Decommission . Clone ( ) ,
}
}
2022-01-10 09:07:49 -08:00
//go:generate msgp -file $GOFILE -unexported
type poolMeta struct {
Version int ` msg:"v" `
Pools [ ] PoolStatus ` msg:"pls" `
2023-10-12 15:30:42 -07:00
// Value should not be saved when we have not loaded anything yet.
dontSave bool ` msg:"-" `
2022-01-10 09:07:49 -08:00
}
// A decommission resumable tells us if decommission is worth
// resuming upon restart of a cluster.
2023-01-16 21:36:34 +05:30
func ( p * poolMeta ) returnResumablePools ( ) [ ] PoolStatus {
2022-01-10 09:07:49 -08:00
var newPools [ ] PoolStatus
for _ , pool := range p . Pools {
if pool . Decommission == nil {
continue
}
if pool . Decommission . Complete || pool . Decommission . Canceled {
// Do not resume decommission upon startup for
// - decommission complete
// - decommission canceled
continue
} // In all other situations we need to resume
newPools = append ( newPools , pool )
}
2023-01-16 21:36:34 +05:30
return newPools
2022-01-10 09:07:49 -08:00
}
func ( p * poolMeta ) DecommissionComplete ( idx int ) bool {
2022-04-06 23:42:05 -07:00
if p . Pools [ idx ] . Decommission != nil && ! p . Pools [ idx ] . Decommission . Complete {
2022-01-11 18:48:43 -08:00
p . Pools [ idx ] . LastUpdate = UTCNow ( )
2022-01-10 09:07:49 -08:00
p . Pools [ idx ] . Decommission . Complete = true
p . Pools [ idx ] . Decommission . Failed = false
p . Pools [ idx ] . Decommission . Canceled = false
return true
}
return false
}
func ( p * poolMeta ) DecommissionFailed ( idx int ) bool {
2022-04-06 23:42:05 -07:00
if p . Pools [ idx ] . Decommission != nil && ! p . Pools [ idx ] . Decommission . Failed {
2022-01-11 18:48:43 -08:00
p . Pools [ idx ] . LastUpdate = UTCNow ( )
2022-01-10 09:07:49 -08:00
p . Pools [ idx ] . Decommission . StartTime = time . Time { }
p . Pools [ idx ] . Decommission . Complete = false
p . Pools [ idx ] . Decommission . Failed = true
p . Pools [ idx ] . Decommission . Canceled = false
return true
}
return false
}
func ( p * poolMeta ) DecommissionCancel ( idx int ) bool {
2022-04-06 23:42:05 -07:00
if p . Pools [ idx ] . Decommission != nil && ! p . Pools [ idx ] . Decommission . Canceled {
2022-01-11 18:48:43 -08:00
p . Pools [ idx ] . LastUpdate = UTCNow ( )
2022-01-10 09:07:49 -08:00
p . Pools [ idx ] . Decommission . StartTime = time . Time { }
p . Pools [ idx ] . Decommission . Complete = false
p . Pools [ idx ] . Decommission . Failed = false
p . Pools [ idx ] . Decommission . Canceled = true
return true
}
return false
}
func ( p poolMeta ) isBucketDecommissioned ( idx int , bucket string ) bool {
return p . Pools [ idx ] . Decommission . isBucketDecommissioned ( bucket )
}
2024-03-18 15:25:45 -07:00
func ( p * poolMeta ) BucketDone ( idx int , bucket decomBucketInfo ) bool {
2022-01-10 09:07:49 -08:00
if p . Pools [ idx ] . Decommission == nil {
// Decommission not in progress.
2024-03-18 15:25:45 -07:00
return false
2022-01-10 09:07:49 -08:00
}
2024-03-18 15:25:45 -07:00
return p . Pools [ idx ] . Decommission . bucketPop ( bucket . String ( ) )
2022-01-10 09:07:49 -08:00
}
func ( p poolMeta ) ResumeBucketObject ( idx int ) ( bucket , object string ) {
if p . Pools [ idx ] . Decommission != nil {
bucket = p . Pools [ idx ] . Decommission . Bucket
object = p . Pools [ idx ] . Decommission . Object
}
return
}
func ( p * poolMeta ) TrackCurrentBucketObject ( idx int , bucket string , object string ) {
if p . Pools [ idx ] . Decommission == nil {
// Decommission not in progress.
return
}
p . Pools [ idx ] . Decommission . Bucket = bucket
p . Pools [ idx ] . Decommission . Object = object
}
2022-07-04 14:02:54 -07:00
func ( p * poolMeta ) PendingBuckets ( idx int ) [ ] decomBucketInfo {
2022-01-10 09:07:49 -08:00
if p . Pools [ idx ] . Decommission == nil {
// Decommission not in progress.
return nil
}
2022-07-04 14:02:54 -07:00
decomBuckets := make ( [ ] decomBucketInfo , len ( p . Pools [ idx ] . Decommission . QueuedBuckets ) )
for i := range decomBuckets {
bucket , prefix := path2BucketObject ( p . Pools [ idx ] . Decommission . QueuedBuckets [ i ] )
decomBuckets [ i ] = decomBucketInfo {
Name : bucket ,
Prefix : prefix ,
}
}
return decomBuckets
}
//msgp:ignore decomBucketInfo
type decomBucketInfo struct {
Name string
Prefix string
}
func ( db decomBucketInfo ) String ( ) string {
return pathJoin ( db . Name , db . Prefix )
2022-01-10 09:07:49 -08:00
}
2022-07-04 14:02:54 -07:00
func ( p * poolMeta ) QueueBuckets ( idx int , buckets [ ] decomBucketInfo ) {
2022-01-10 09:07:49 -08:00
// add new queued buckets
for _ , bucket := range buckets {
2022-07-04 14:02:54 -07:00
p . Pools [ idx ] . Decommission . bucketPush ( bucket )
2022-01-10 09:07:49 -08:00
}
}
var (
errDecommissionAlreadyRunning = errors . New ( "decommission is already in progress" )
errDecommissionComplete = errors . New ( "decommission is complete, please remove the servers from command-line" )
2023-07-10 07:55:38 -07:00
errDecommissionNotStarted = errors . New ( "decommission is not in progress" )
2022-01-10 09:07:49 -08:00
)
2022-01-11 18:48:43 -08:00
func ( p * poolMeta ) Decommission ( idx int , pi poolSpaceInfo ) error {
2022-05-03 21:36:08 +01:00
// Return an error when there is decommission on going - the user needs
// to explicitly cancel it first in order to restart decommissioning again.
if p . Pools [ idx ] . Decommission != nil &&
! p . Pools [ idx ] . Decommission . Complete &&
! p . Pools [ idx ] . Decommission . Failed &&
! p . Pools [ idx ] . Decommission . Canceled {
return errDecommissionAlreadyRunning
2022-01-10 09:07:49 -08:00
}
2022-05-03 21:36:08 +01:00
now := UTCNow ( )
p . Pools [ idx ] . LastUpdate = now
p . Pools [ idx ] . Decommission = & PoolDecommissionInfo {
StartTime : now ,
StartSize : pi . Free ,
CurrentSize : pi . Free ,
TotalSize : pi . Total ,
2022-01-10 09:07:49 -08:00
}
2022-05-03 21:36:08 +01:00
return nil
2022-01-10 09:07:49 -08:00
}
func ( p poolMeta ) IsSuspended ( idx int ) bool {
2023-10-12 15:30:42 -07:00
if idx >= len ( p . Pools ) {
// We don't really know if the pool is suspended or not, since it doesn't exist.
return false
}
2022-01-10 09:07:49 -08:00
return p . Pools [ idx ] . Decommission != nil
}
2022-01-14 10:32:35 -08:00
func ( p * poolMeta ) validate ( pools [ ] * erasureSets ) ( bool , error ) {
2022-01-10 09:07:49 -08:00
type poolInfo struct {
2022-01-14 10:32:35 -08:00
position int
completed bool
decomStarted bool // started but not finished yet
2022-01-10 09:07:49 -08:00
}
rememberedPools := make ( map [ string ] poolInfo )
for idx , pool := range p . Pools {
complete := false
2022-01-14 10:32:35 -08:00
decomStarted := false
if pool . Decommission != nil {
if pool . Decommission . Complete {
complete = true
}
decomStarted = true
2022-01-10 09:07:49 -08:00
}
rememberedPools [ pool . CmdLine ] = poolInfo {
2022-01-14 10:32:35 -08:00
position : idx ,
completed : complete ,
decomStarted : decomStarted ,
2022-01-10 09:07:49 -08:00
}
}
specifiedPools := make ( map [ string ] int )
for idx , pool := range pools {
specifiedPools [ pool . endpoints . CmdLine ] = idx
}
2022-04-28 16:27:53 -07:00
var update bool
2023-05-16 16:00:57 -07:00
// Check if specified pools need to be removed from decommissioned pool.
2022-01-10 09:07:49 -08:00
for k := range specifiedPools {
pi , ok := rememberedPools [ k ]
2022-04-28 16:27:53 -07:00
if ! ok {
2023-05-16 16:00:57 -07:00
// we do not have the pool anymore that we previously remembered, since all
// the CLI checks out we can allow updates since we are mostly adding a pool here.
update = true
2022-04-28 16:27:53 -07:00
}
2022-01-10 09:07:49 -08:00
if ok && pi . completed {
2024-07-01 22:38:46 +08:00
logger . LogIf ( GlobalContext , "decommission" , fmt . Errorf ( "pool(%s) = %s is decommissioned, please remove from server command line" , humanize . Ordinal ( pi . position + 1 ) , k ) )
2022-01-10 09:07:49 -08:00
}
}
2023-05-16 16:00:57 -07:00
if len ( specifiedPools ) == len ( rememberedPools ) {
2022-01-10 09:07:49 -08:00
for k , pi := range rememberedPools {
pos , ok := specifiedPools [ k ]
2022-11-07 00:11:58 -08:00
if ok && pos != pi . position {
2023-05-16 16:00:57 -07:00
update = true // pool order is changing, its okay to allow it.
2022-01-10 09:07:49 -08:00
}
}
}
2022-04-28 16:27:53 -07:00
if ! update {
2023-05-16 16:00:57 -07:00
update = len ( specifiedPools ) != len ( rememberedPools )
2022-01-14 10:32:35 -08:00
}
2022-11-16 07:59:10 -08:00
2022-01-14 10:32:35 -08:00
return update , nil
}
func ( p * poolMeta ) load ( ctx context . Context , pool * erasureSets , pools [ ] * erasureSets ) error {
data , err := readConfig ( ctx , pool , poolMetaName )
if err != nil {
if errors . Is ( err , errConfigNotFound ) || isErrObjectNotFound ( err ) {
return nil
}
return err
}
if len ( data ) == 0 {
// Seems to be empty create a new poolMeta object.
return nil
}
if len ( data ) <= 4 {
return fmt . Errorf ( "poolMeta: no data" )
}
// Read header
switch binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) {
case poolMetaFormat :
default :
return fmt . Errorf ( "poolMeta: unknown format: %d" , binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) )
}
switch binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) {
case poolMetaVersion :
default :
return fmt . Errorf ( "poolMeta: unknown version: %d" , binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) )
}
// OK, parse data.
if _ , err = p . UnmarshalMsg ( data [ 4 : ] ) ; err != nil {
return err
}
switch p . Version {
case poolMetaVersionV1 :
default :
return fmt . Errorf ( "unexpected pool meta version: %d" , p . Version )
}
return nil
2022-01-10 09:07:49 -08:00
}
func ( p * poolMeta ) CountItem ( idx int , size int64 , failed bool ) {
pd := p . Pools [ idx ] . Decommission
2023-09-16 02:28:06 -07:00
if pd == nil {
return
}
if failed {
pd . ItemsDecommissionFailed ++
pd . BytesFailed += size
} else {
pd . ItemsDecommissioned ++
pd . BytesDone += size
2022-01-10 09:07:49 -08:00
}
2023-09-16 02:28:06 -07:00
p . Pools [ idx ] . Decommission = pd
2022-01-10 09:07:49 -08:00
}
2022-01-11 18:48:43 -08:00
func ( p * poolMeta ) updateAfter ( ctx context . Context , idx int , pools [ ] * erasureSets , duration time . Duration ) ( bool , error ) {
2022-01-10 09:07:49 -08:00
if p . Pools [ idx ] . Decommission == nil {
2022-01-11 18:48:43 -08:00
return false , errInvalidArgument
2022-01-10 09:07:49 -08:00
}
2022-01-11 18:48:43 -08:00
now := UTCNow ( )
if now . Sub ( p . Pools [ idx ] . LastUpdate ) >= duration {
if serverDebugLog {
2022-08-04 16:10:08 -07:00
console . Debugf ( "decommission: persisting poolMeta on drive: threshold:%s, poolMeta:%#v\n" , now . Sub ( p . Pools [ idx ] . LastUpdate ) , p . Pools [ idx ] )
2022-01-11 18:48:43 -08:00
}
p . Pools [ idx ] . LastUpdate = now
if err := p . save ( ctx , pools ) ; err != nil {
return false , err
}
return true , nil
2022-01-10 09:07:49 -08:00
}
2022-01-11 18:48:43 -08:00
return false , nil
2022-01-10 09:07:49 -08:00
}
func ( p poolMeta ) save ( ctx context . Context , pools [ ] * erasureSets ) error {
2023-10-12 15:30:42 -07:00
if p . dontSave {
return nil
}
2022-01-10 09:07:49 -08:00
data := make ( [ ] byte , 4 , p . Msgsize ( ) + 4 )
// Initialize the header.
binary . LittleEndian . PutUint16 ( data [ 0 : 2 ] , poolMetaFormat )
binary . LittleEndian . PutUint16 ( data [ 2 : 4 ] , poolMetaVersion )
buf , err := p . MarshalMsg ( data )
if err != nil {
return err
}
// Saves on all pools to make sure decommissioning of first pool is allowed.
2023-06-04 22:20:21 +01:00
for i , eset := range pools {
2022-01-10 09:07:49 -08:00
if err = saveConfig ( ctx , eset , poolMetaName , buf ) ; err != nil {
2023-06-21 08:49:28 -07:00
if ! errors . Is ( err , context . Canceled ) {
2024-04-04 13:04:40 +01:00
storageLogIf ( ctx , fmt . Errorf ( "saving pool.bin for pool index %d failed with: %v" , i , err ) )
2023-06-21 08:49:28 -07:00
}
2022-01-10 09:07:49 -08:00
return err
}
}
return nil
}
const (
poolMetaName = "pool.bin"
poolMetaFormat = 1
poolMetaVersionV1 = 1
poolMetaVersion = poolMetaVersionV1
)
// Init() initializes pools and saves additional information about them
// in 'pool.bin', this is eventually used for decommissioning the pool.
func ( z * erasureServerPools ) Init ( ctx context . Context ) error {
2022-10-25 12:36:57 -07:00
// Load rebalance metadata if present
2024-09-06 06:20:19 -07:00
if err := z . loadRebalanceMeta ( ctx ) ; err == nil {
// Start rebalance routine if we can reload rebalance metadata.
z . StartRebalance ( )
2022-10-25 12:36:57 -07:00
}
2022-01-10 09:07:49 -08:00
meta := poolMeta { }
2022-01-14 10:32:35 -08:00
if err := meta . load ( ctx , z . serverPools [ 0 ] , z . serverPools ) ; err != nil {
return err
}
update , err := meta . validate ( z . serverPools )
2022-01-10 09:07:49 -08:00
if err != nil {
return err
}
// if no update is needed return right away.
if ! update {
2023-10-12 15:30:42 -07:00
z . poolMetaMutex . Lock ( )
2022-04-06 23:42:05 -07:00
z . poolMeta = meta
2023-10-12 15:30:42 -07:00
z . poolMetaMutex . Unlock ( )
2023-05-16 16:00:57 -07:00
} else {
2023-10-12 15:30:42 -07:00
newMeta := newPoolMeta ( z , meta )
2023-06-23 07:44:18 -07:00
if err = newMeta . save ( ctx , z . serverPools ) ; err != nil {
2023-05-16 16:00:57 -07:00
return err
}
2023-10-12 15:30:42 -07:00
z . poolMetaMutex . Lock ( )
2023-06-23 07:44:18 -07:00
z . poolMeta = newMeta
2023-10-12 15:30:42 -07:00
z . poolMetaMutex . Unlock ( )
2023-05-16 16:00:57 -07:00
}
2023-01-16 21:36:34 +05:30
2023-05-16 16:00:57 -07:00
pools := meta . returnResumablePools ( )
poolIndices := make ( [ ] int , 0 , len ( pools ) )
for _ , pool := range pools {
idx := globalEndpoints . GetPoolIdx ( pool . CmdLine )
if idx == - 1 {
return fmt . Errorf ( "unexpected state present for decommission status pool(%s) not found" , pool . CmdLine )
}
poolIndices = append ( poolIndices , idx )
}
if len ( poolIndices ) > 0 && globalEndpoints [ poolIndices [ 0 ] ] . Endpoints [ 0 ] . IsLocal {
go func ( ) {
2024-05-01 08:18:21 -07:00
// Resume decommissioning of pools, but wait 3 minutes for cluster to stabilize.
if err := sleepContext ( ctx , 3 * time . Minute ) ; err != nil {
return
}
2023-05-16 16:00:57 -07:00
r := rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) )
for {
if err := z . Decommission ( ctx , poolIndices ... ) ; err != nil {
if errors . Is ( err , errDecommissionAlreadyRunning ) {
// A previous decommission running found restart it.
for _ , idx := range poolIndices {
z . doDecommissionInRoutine ( ctx , idx )
2023-01-16 21:36:34 +05:30
}
return
2022-04-06 23:42:05 -07:00
}
2023-05-16 16:00:57 -07:00
if configRetriableErrors ( err ) {
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , fmt . Errorf ( "Unable to resume decommission of pools %v: %w: retrying.." , pools , err ) )
2023-05-16 16:00:57 -07:00
time . Sleep ( time . Second + time . Duration ( r . Float64 ( ) * float64 ( 5 * time . Second ) ) )
continue
}
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , fmt . Errorf ( "Unable to resume decommission of pool %v: %w" , pools , err ) )
2023-05-16 16:00:57 -07:00
return
2023-01-16 21:36:34 +05:30
}
2023-05-16 16:00:57 -07:00
}
} ( )
2022-01-10 09:07:49 -08:00
}
return nil
}
2023-10-12 15:30:42 -07:00
func newPoolMeta ( z * erasureServerPools , prevMeta poolMeta ) poolMeta {
newMeta := poolMeta { } // to update write poolMeta fresh.
// looks like new pool was added we need to update,
// or this is a fresh installation (or an existing
// installation with pool removed)
newMeta . Version = poolMetaVersion
for idx , pool := range z . serverPools {
var skip bool
for _ , currentPool := range prevMeta . Pools {
// Preserve any current pool status.
if currentPool . CmdLine == pool . endpoints . CmdLine {
newMeta . Pools = append ( newMeta . Pools , currentPool )
skip = true
break
}
}
if skip {
continue
}
newMeta . Pools = append ( newMeta . Pools , PoolStatus {
CmdLine : pool . endpoints . CmdLine ,
ID : idx ,
LastUpdate : UTCNow ( ) ,
} )
}
return newMeta
}
2022-10-25 12:36:57 -07:00
func ( z * erasureServerPools ) IsDecommissionRunning ( ) bool {
z . poolMetaMutex . RLock ( )
defer z . poolMetaMutex . RUnlock ( )
meta := z . poolMeta
for _ , pool := range meta . Pools {
2023-01-24 16:07:59 +01:00
if pool . Decommission != nil &&
! pool . Decommission . Complete &&
! pool . Decommission . Failed &&
! pool . Decommission . Canceled {
2022-10-25 12:36:57 -07:00
return true
}
}
return false
}
2024-08-09 19:30:44 -07:00
func ( z * erasureServerPools ) decommissionObject ( ctx context . Context , idx int , bucket string , gr * GetObjectReader ) ( err error ) {
2022-01-10 09:07:49 -08:00
objInfo := gr . ObjInfo
2022-05-04 08:45:27 +01:00
defer func ( ) {
gr . Close ( )
auditLogDecom ( ctx , "DecomCopyData" , objInfo . Bucket , objInfo . Name , objInfo . VersionID , err )
} ( )
2022-07-16 19:35:24 -07:00
actualSize , err := objInfo . GetActualSize ( )
if err != nil {
return err
}
2022-01-10 09:07:49 -08:00
if objInfo . isMultipart ( ) {
2022-08-30 01:57:16 +02:00
res , err := z . NewMultipartUpload ( ctx , bucket , objInfo . Name , ObjectOptions {
2024-08-09 19:30:44 -07:00
VersionID : objInfo . VersionID ,
UserDefined : objInfo . UserDefined ,
NoAuditLog : true ,
SrcPoolIdx : idx ,
DataMovement : true ,
2022-01-10 17:26:00 -08:00
} )
2022-01-10 09:07:49 -08:00
if err != nil {
2022-07-14 20:44:22 -07:00
return fmt . Errorf ( "decommissionObject: NewMultipartUpload() %w" , err )
2022-01-10 09:07:49 -08:00
}
2024-03-06 03:43:16 -08:00
defer z . AbortMultipartUpload ( ctx , bucket , objInfo . Name , res . UploadID , ObjectOptions { NoAuditLog : true } )
2022-01-10 17:26:00 -08:00
parts := make ( [ ] CompletePart , len ( objInfo . Parts ) )
for i , part := range objInfo . Parts {
2023-09-19 01:00:54 +08:00
hr , err := hash . NewReader ( ctx , io . LimitReader ( gr , part . Size ) , part . Size , "" , "" , part . ActualSize )
2022-01-10 09:07:49 -08:00
if err != nil {
2023-05-16 13:14:37 -07:00
return fmt . Errorf ( "decommissionObject: hash.NewReader() %w" , err )
2022-01-10 09:07:49 -08:00
}
2022-08-30 01:57:16 +02:00
pi , err := z . PutObjectPart ( ctx , bucket , objInfo . Name , res . UploadID ,
2022-01-10 09:07:49 -08:00
part . Number ,
NewPutObjReader ( hr ) ,
2022-07-16 19:35:24 -07:00
ObjectOptions {
PreserveETag : part . ETag , // Preserve original ETag to ensure same metadata.
IndexCB : func ( ) [ ] byte {
return part . Index // Preserve part Index to ensure decompression works.
} ,
2024-03-06 03:43:16 -08:00
NoAuditLog : true ,
2022-07-16 19:35:24 -07:00
} )
2022-01-10 09:07:49 -08:00
if err != nil {
2022-07-14 20:44:22 -07:00
return fmt . Errorf ( "decommissionObject: PutObjectPart() %w" , err )
2022-01-10 09:07:49 -08:00
}
2022-01-10 17:26:00 -08:00
parts [ i ] = CompletePart {
2022-08-30 01:57:16 +02:00
ETag : pi . ETag ,
PartNumber : pi . PartNumber ,
ChecksumCRC32 : pi . ChecksumCRC32 ,
ChecksumCRC32C : pi . ChecksumCRC32C ,
ChecksumSHA256 : pi . ChecksumSHA256 ,
ChecksumSHA1 : pi . ChecksumSHA1 ,
2022-01-10 17:26:00 -08:00
}
2022-01-10 09:07:49 -08:00
}
2022-08-30 01:57:16 +02:00
_ , err = z . CompleteMultipartUpload ( ctx , bucket , objInfo . Name , res . UploadID , parts , ObjectOptions {
2024-08-09 19:30:44 -07:00
SrcPoolIdx : idx ,
2024-02-03 23:03:30 +01:00
DataMovement : true ,
MTime : objInfo . ModTime ,
2024-03-06 03:43:16 -08:00
NoAuditLog : true ,
2022-01-10 09:07:49 -08:00
} )
2022-07-14 20:44:22 -07:00
if err != nil {
err = fmt . Errorf ( "decommissionObject: CompleteMultipartUpload() %w" , err )
}
2022-01-10 09:07:49 -08:00
return err
}
2022-07-16 19:35:24 -07:00
2023-09-19 01:00:54 +08:00
hr , err := hash . NewReader ( ctx , io . LimitReader ( gr , objInfo . Size ) , objInfo . Size , "" , "" , actualSize )
2022-01-10 09:07:49 -08:00
if err != nil {
2023-05-16 13:14:37 -07:00
return fmt . Errorf ( "decommissionObject: hash.NewReader() %w" , err )
2022-01-10 09:07:49 -08:00
}
2024-02-03 23:03:30 +01:00
2022-01-10 09:07:49 -08:00
_ , err = z . PutObject ( ctx ,
bucket ,
objInfo . Name ,
NewPutObjReader ( hr ) ,
ObjectOptions {
2024-02-03 23:03:30 +01:00
DataMovement : true ,
2024-08-09 19:30:44 -07:00
SrcPoolIdx : idx ,
2022-07-16 19:35:24 -07:00
VersionID : objInfo . VersionID ,
MTime : objInfo . ModTime ,
UserDefined : objInfo . UserDefined ,
PreserveETag : objInfo . ETag , // Preserve original ETag to ensure same metadata.
IndexCB : func ( ) [ ] byte {
return objInfo . Parts [ 0 ] . Index // Preserve part Index to ensure decompression works.
} ,
2024-03-06 03:43:16 -08:00
NoAuditLog : true ,
2022-01-10 09:07:49 -08:00
} )
2022-07-14 20:44:22 -07:00
if err != nil {
err = fmt . Errorf ( "decommissionObject: PutObject() %w" , err )
}
2022-01-10 09:07:49 -08:00
return err
}
// versionsSorter sorts FileInfo slices by version.
2022-08-26 12:52:29 -07:00
//
2022-01-10 09:07:49 -08:00
//msgp:ignore versionsSorter
type versionsSorter [ ] FileInfo
func ( v versionsSorter ) reverse ( ) {
sort . Slice ( v , func ( i , j int ) bool {
return v [ i ] . ModTime . Before ( v [ j ] . ModTime )
} )
}
2023-01-12 01:22:51 +01:00
func ( set * erasureObjects ) listObjectsToDecommission ( ctx context . Context , bi decomBucketInfo , fn func ( entry metaCacheEntry ) ) error {
2024-03-06 03:43:16 -08:00
disks , _ := set . getOnlineDisksWithHealing ( false )
2023-01-12 01:22:51 +01:00
if len ( disks ) == 0 {
return fmt . Errorf ( "no online drives found for set with endpoints %s" , set . getEndpoints ( ) )
}
2024-03-06 03:43:16 -08:00
// However many we ask, versions must exist on ~50%
listingQuorum := ( set . setDriveCount + 1 ) / 2
2023-11-17 21:09:09 -08:00
2023-01-12 01:22:51 +01:00
// How to resolve partial results.
resolver := metadataResolutionParams {
2024-03-06 03:43:16 -08:00
dirQuorum : listingQuorum , // make sure to capture all quorum ratios
objQuorum : listingQuorum , // make sure to capture all quorum ratios
2023-01-12 01:22:51 +01:00
bucket : bi . Name ,
}
err := listPathRaw ( ctx , listPathRawOptions {
disks : disks ,
bucket : bi . Name ,
path : bi . Prefix ,
recursive : true ,
forwardTo : "" ,
2024-03-06 03:43:16 -08:00
minDisks : listingQuorum ,
2023-01-12 01:22:51 +01:00
reportNotFound : false ,
agreed : fn ,
partial : func ( entries metaCacheEntries , _ [ ] error ) {
entry , ok := entries . resolve ( & resolver )
if ok {
fn ( * entry )
}
} ,
finished : nil ,
} )
return err
}
2022-07-04 14:02:54 -07:00
func ( z * erasureServerPools ) decommissionPool ( ctx context . Context , idx int , pool * erasureSets , bi decomBucketInfo ) error {
2022-05-04 08:45:27 +01:00
ctx = logger . SetReqInfo ( ctx , & logger . ReqInfo { } )
2024-03-06 03:43:16 -08:00
const envDecomWorkers = "_MINIO_DECOMMISSION_WORKERS"
workerSize , err := env . GetInt ( envDecomWorkers , len ( pool . sets ) )
2022-04-12 10:49:53 -07:00
if err != nil {
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , fmt . Errorf ( "invalid workers value err: %v, defaulting to %d" , err , len ( pool . sets ) ) )
2024-03-06 03:43:16 -08:00
workerSize = len ( pool . sets )
2022-04-12 10:49:53 -07:00
}
2022-04-07 23:19:13 -07:00
2024-03-06 03:43:16 -08:00
// Each decom worker needs one List() goroutine/worker
// add that many extra workers.
2023-05-09 16:37:31 -07:00
workerSize += len ( pool . sets )
wk , err := workers . New ( workerSize )
if err != nil {
return err
}
2022-01-10 09:07:49 -08:00
2024-04-22 10:49:30 -07:00
var vc * versioning . Versioning
var lc * lifecycle . Lifecycle
var lr objectlock . Retention
var rcfg * replication . Config
if bi . Name != minioMetaBucket {
vc , err = globalBucketVersioningSys . Get ( bi . Name )
if err != nil {
return err
}
2022-01-10 09:07:49 -08:00
2024-04-22 10:49:30 -07:00
// Check if the current bucket has a configured lifecycle policy
lc , err = globalLifecycleSys . Get ( bi . Name )
if err != nil && ! errors . Is ( err , BucketLifecycleNotFound { Bucket : bi . Name } ) {
return err
}
2022-07-14 16:47:09 -07:00
2024-04-22 10:49:30 -07:00
// Check if bucket is object locked.
lr , err = globalBucketObjectLockSys . Get ( bi . Name )
if err != nil {
return err
}
rcfg , err = getReplicationConfig ( ctx , bi . Name )
if err != nil {
return err
}
}
2022-07-14 16:47:09 -07:00
2023-07-19 13:09:37 -07:00
for setIdx , set := range pool . sets {
2023-05-25 09:18:49 -07:00
set := set
2022-07-14 16:47:09 -07:00
filterLifecycle := func ( bucket , object string , fi FileInfo ) bool {
if lc == nil {
return false
}
versioned := vc != nil && vc . Versioned ( object )
objInfo := fi . ToObjectInfo ( bucket , object , versioned )
2023-03-09 15:15:30 -08:00
2023-10-06 05:55:15 -07:00
evt := evalActionFromLifecycle ( ctx , * lc , lr , rcfg , objInfo )
2023-03-16 07:48:05 -07:00
switch {
2023-12-01 07:56:24 -08:00
case evt . Action . DeleteRestored ( ) : // if restored copy has expired, delete it synchronously
2023-05-22 15:28:56 -07:00
applyExpiryOnTransitionedObject ( ctx , z , objInfo , evt , lcEventSrc_Decom )
2023-03-16 07:48:05 -07:00
return false
case evt . Action . Delete ( ) :
2023-05-22 15:28:56 -07:00
globalExpiryState . enqueueByDays ( objInfo , evt , lcEventSrc_Decom )
2023-03-16 07:48:05 -07:00
return true
default :
return false
2022-07-14 16:47:09 -07:00
}
}
2022-01-10 09:07:49 -08:00
decommissionEntry := func ( entry metaCacheEntry ) {
2023-05-09 16:37:31 -07:00
defer wk . Give ( )
2022-04-26 20:06:41 -07:00
2022-01-10 09:07:49 -08:00
if entry . isDir ( ) {
return
}
2022-07-04 14:02:54 -07:00
fivs , err := entry . fileInfoVersions ( bi . Name )
2022-01-10 09:07:49 -08:00
if err != nil {
return
}
2022-03-07 16:18:57 -08:00
// We need a reversed order for decommissioning,
2022-01-10 09:07:49 -08:00
// to create the appropriate stack.
versionsSorter ( fivs . Versions ) . reverse ( )
2023-06-21 13:23:20 -07:00
var decommissioned , expired int
2022-01-10 09:07:49 -08:00
for _ , version := range fivs . Versions {
2023-06-27 11:59:40 -07:00
stopFn := globalDecommissionMetrics . log ( decomMetricDecommissionObject , idx , bi . Name , version . Name , version . VersionID )
2022-07-14 16:47:09 -07:00
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle ( bi . Name , version . Name , version ) {
2023-06-21 13:23:20 -07:00
expired ++
decommissioned ++
2024-06-03 08:45:54 -07:00
stopFn ( version . Size , errors . New ( "ILM expired object/version will be skipped" ) )
2022-07-14 16:47:09 -07:00
continue
}
2023-05-25 09:18:49 -07:00
// any object with only single DEL marker we don't need
2023-06-21 08:49:28 -07:00
// to decommission, just skip it, this also includes
// any other versions that have already expired.
2023-06-21 13:23:20 -07:00
remainingVersions := len ( fivs . Versions ) - expired
2023-06-21 08:49:28 -07:00
if version . Deleted && remainingVersions == 1 {
2023-06-21 13:23:20 -07:00
decommissioned ++
2024-06-03 08:45:54 -07:00
stopFn ( version . Size , errors . New ( "DELETE marked object with no other non-current versions will be skipped" ) )
2023-05-25 09:18:49 -07:00
continue
}
versionID := version . VersionID
if versionID == "" {
versionID = nullVersionID
}
2024-08-09 19:30:44 -07:00
var failure , ignore bool
2022-01-10 09:07:49 -08:00
if version . Deleted {
_ , err := z . DeleteObject ( ctx ,
2022-07-04 14:02:54 -07:00
bi . Name ,
2022-01-10 09:07:49 -08:00
version . Name ,
ObjectOptions {
2023-05-25 09:18:49 -07:00
// Since we are preserving a delete marker, we have to make sure this is always true.
// regardless of the current configuration of the bucket we must preserve all versions
// on the pool being decommissioned.
Versioned : true ,
VersionID : versionID ,
2022-07-16 19:35:24 -07:00
MTime : version . ModTime ,
DeleteReplication : version . ReplicationState ,
2024-08-09 19:30:44 -07:00
SrcPoolIdx : idx ,
DataMovement : true ,
2022-07-16 19:35:24 -07:00
DeleteMarker : true , // make sure we create a delete marker
SkipDecommissioned : true , // make sure we skip the decommissioned pool
2024-03-06 03:43:16 -08:00
NoAuditLog : true ,
2022-01-10 09:07:49 -08:00
} )
2023-06-27 11:59:40 -07:00
if err != nil {
2024-08-09 19:30:44 -07:00
// This can happen when rebalance stop races with ongoing rebalance workers.
// These rebalance failures can be ignored.
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) || isDataMovementOverWriteErr ( err ) {
ignore = true
stopFn ( 0 , nil )
continue
2023-06-27 11:59:40 -07:00
}
}
2024-06-03 08:45:54 -07:00
stopFn ( version . Size , err )
2023-06-27 11:59:40 -07:00
if err != nil {
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , err )
2022-04-26 20:06:41 -07:00
failure = true
}
z . poolMetaMutex . Lock ( )
z . poolMeta . CountItem ( idx , 0 , failure )
z . poolMetaMutex . Unlock ( )
2022-07-05 07:37:24 -07:00
if ! failure {
// Success keep a count.
2023-06-21 13:23:20 -07:00
decommissioned ++
2022-01-10 09:07:49 -08:00
}
2024-03-06 03:43:16 -08:00
auditLogDecom ( ctx , "DecomCopyDeleteMarker" , bi . Name , version . Name , versionID , err )
2022-01-10 09:07:49 -08:00
continue
}
2022-04-26 20:06:41 -07:00
2022-01-10 09:07:49 -08:00
// gr.Close() is ensured by decommissionObject().
2022-05-11 11:37:32 -07:00
for try := 0 ; try < 3 ; try ++ {
2023-03-16 07:48:05 -07:00
if version . IsRemote ( ) {
if err := z . DecomTieredObject ( ctx , bi . Name , version . Name , version , ObjectOptions {
2024-08-09 19:30:44 -07:00
VersionID : versionID ,
MTime : version . ModTime ,
UserDefined : version . Metadata ,
SrcPoolIdx : idx ,
DataMovement : true ,
2023-03-16 07:48:05 -07:00
} ) ; err != nil {
2024-08-09 19:30:44 -07:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) || isDataMovementOverWriteErr ( err ) {
ignore = true
stopFn ( 0 , nil )
}
}
if ! ignore {
2024-06-03 08:45:54 -07:00
stopFn ( version . Size , err )
2024-08-09 19:30:44 -07:00
failure = err != nil
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , err )
2023-03-16 07:48:05 -07:00
}
break
}
2022-05-11 11:37:32 -07:00
gr , err := set . GetObjectNInfo ( ctx ,
2022-07-04 14:02:54 -07:00
bi . Name ,
2022-05-11 11:37:32 -07:00
encodeDirObject ( version . Name ) ,
nil ,
http . Header { } ,
ObjectOptions {
2023-05-25 09:18:49 -07:00
VersionID : versionID ,
2022-07-16 19:35:24 -07:00
NoDecryption : true ,
2023-04-17 12:16:37 -07:00
NoLock : true ,
2024-03-06 03:43:16 -08:00
NoAuditLog : true ,
2022-05-11 11:37:32 -07:00
} )
2022-10-06 17:41:58 +01:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
2022-05-18 17:58:19 -07:00
// object deleted by the application, nothing to do here we move on.
2022-10-06 17:41:58 +01:00
ignore = true
2024-08-09 19:30:44 -07:00
stopFn ( 0 , nil )
2022-10-06 17:41:58 +01:00
break
2022-05-18 17:58:19 -07:00
}
2023-05-11 13:35:16 -07:00
if err != nil && ! ignore {
// if usage-cache.bin is not readable log and ignore it.
if bi . Name == minioMetaBucket && strings . Contains ( version . Name , dataUsageCacheName ) {
ignore = true
2024-06-03 08:45:54 -07:00
stopFn ( version . Size , err )
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , err )
2023-05-11 13:35:16 -07:00
break
}
}
2022-05-11 11:37:32 -07:00
if err != nil {
failure = true
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , err )
2024-06-03 08:45:54 -07:00
stopFn ( version . Size , err )
2022-05-11 11:37:32 -07:00
continue
}
2024-08-09 19:30:44 -07:00
if err = z . decommissionObject ( ctx , idx , bi . Name , gr ) ; err != nil {
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) || isDataMovementOverWriteErr ( err ) {
ignore = true
stopFn ( 0 , nil )
break
}
2024-06-03 08:45:54 -07:00
stopFn ( version . Size , err )
2022-05-11 11:37:32 -07:00
failure = true
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , err )
2022-05-11 11:37:32 -07:00
continue
}
2024-06-03 08:45:54 -07:00
stopFn ( version . Size , nil )
2022-05-11 11:37:32 -07:00
failure = false
break
2022-04-26 20:06:41 -07:00
}
2022-10-06 17:41:58 +01:00
if ignore {
continue
}
2022-04-26 20:06:41 -07:00
z . poolMetaMutex . Lock ( )
z . poolMeta . CountItem ( idx , version . Size , failure )
z . poolMetaMutex . Unlock ( )
if failure {
break // break out on first error
2022-01-10 09:07:49 -08:00
}
2023-06-21 13:23:20 -07:00
decommissioned ++
2022-04-26 20:06:41 -07:00
}
// if all versions were decommissioned, then we can delete the object versions.
2023-06-21 13:23:20 -07:00
if decommissioned == len ( fivs . Versions ) {
2022-08-10 12:46:45 -07:00
stopFn := globalDecommissionMetrics . log ( decomMetricDecommissionRemoveObject , idx , bi . Name , entry . name )
2022-05-04 08:45:27 +01:00
_ , err := set . DeleteObject ( ctx ,
2022-07-04 14:02:54 -07:00
bi . Name ,
2022-05-18 17:58:19 -07:00
encodeDirObject ( entry . name ) ,
2022-01-10 09:07:49 -08:00
ObjectOptions {
2024-01-27 10:14:03 -08:00
DeletePrefix : true , // use prefix delete to delete all versions at once.
DeletePrefixObject : true , // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
2024-03-06 03:43:16 -08:00
NoAuditLog : true ,
2022-04-26 20:06:41 -07:00
} ,
)
2024-06-03 08:45:54 -07:00
stopFn ( 0 , err )
2022-07-04 14:02:54 -07:00
auditLogDecom ( ctx , "DecomDeleteObject" , bi . Name , entry . name , "" , err )
2022-05-04 08:45:27 +01:00
if err != nil {
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , err )
2022-05-04 08:45:27 +01:00
}
2022-01-10 09:07:49 -08:00
}
z . poolMetaMutex . Lock ( )
2022-07-04 14:02:54 -07:00
z . poolMeta . TrackCurrentBucketObject ( idx , bi . Name , entry . name )
2022-01-11 18:48:43 -08:00
ok , err := z . poolMeta . updateAfter ( ctx , idx , z . serverPools , 30 * time . Second )
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , err )
2022-01-11 18:48:43 -08:00
if ok {
globalNotificationSys . ReloadPoolMeta ( ctx )
}
2022-01-10 09:07:49 -08:00
z . poolMetaMutex . Unlock ( )
}
2023-05-09 16:37:31 -07:00
wk . Take ( )
2023-07-19 13:09:37 -07:00
go func ( setIdx int ) {
2023-05-09 16:37:31 -07:00
defer wk . Give ( )
2023-07-19 13:09:37 -07:00
// We will perpetually retry listing if it fails, since we cannot
// possibly give up in this matter
for {
2024-03-06 03:43:16 -08:00
if contextCanceled ( ctx ) {
break
}
2023-07-19 13:09:37 -07:00
err := set . listObjectsToDecommission ( ctx , bi ,
func ( entry metaCacheEntry ) {
wk . Take ( )
go decommissionEntry ( entry )
} ,
)
2023-09-30 00:02:29 -07:00
if err == nil || errors . Is ( err , context . Canceled ) {
2023-07-19 13:09:37 -07:00
break
}
setN := humanize . Ordinal ( setIdx + 1 )
2023-09-30 00:02:29 -07:00
retryDur := time . Duration ( rand . Float64 ( ) * float64 ( 5 * time . Second ) )
2024-04-04 13:04:40 +01:00
decomLogOnceIf ( ctx , fmt . Errorf ( "listing objects from %s set failed with %v, retrying in %v" , setN , err , retryDur ) , "decom-listing-failed" + setN )
2023-07-19 13:09:37 -07:00
time . Sleep ( retryDur )
}
} ( setIdx )
2022-01-10 09:07:49 -08:00
}
2023-05-09 16:37:31 -07:00
wk . Wait ( )
2022-01-10 09:07:49 -08:00
return nil
}
2022-08-10 12:46:45 -07:00
//msgp:ignore decomMetrics
type decomMetrics struct { }
var globalDecommissionMetrics decomMetrics
//msgp:ignore decomMetric
//go:generate stringer -type=decomMetric -trimprefix=decomMetric $GOFILE
type decomMetric uint8
const (
decomMetricDecommissionBucket decomMetric = iota
decomMetricDecommissionObject
decomMetricDecommissionRemoveObject
)
2024-06-03 08:45:54 -07:00
func decomTrace ( d decomMetric , poolIdx int , startTime time . Time , duration time . Duration , path string , err error , sz int64 ) madmin . TraceInfo {
2022-08-10 12:46:45 -07:00
var errStr string
if err != nil {
errStr = err . Error ( )
}
return madmin . TraceInfo {
TraceType : madmin . TraceDecommission ,
Time : startTime ,
NodeName : globalLocalNodeName ,
FuncName : fmt . Sprintf ( "decommission.%s (pool-id=%d)" , d . String ( ) , poolIdx ) ,
Duration : duration ,
Path : path ,
Error : errStr ,
2024-06-03 08:45:54 -07:00
Bytes : sz ,
2022-08-10 12:46:45 -07:00
}
}
2024-06-03 08:45:54 -07:00
func ( m * decomMetrics ) log ( d decomMetric , poolIdx int , paths ... string ) func ( z int64 , err error ) {
2022-08-10 12:46:45 -07:00
startTime := time . Now ( )
2024-06-03 08:45:54 -07:00
return func ( sz int64 , err error ) {
2022-08-10 12:46:45 -07:00
duration := time . Since ( startTime )
if globalTrace . NumSubscribers ( madmin . TraceDecommission ) > 0 {
2024-06-03 08:45:54 -07:00
globalTrace . Publish ( decomTrace ( d , poolIdx , startTime , duration , strings . Join ( paths , " " ) , err , sz ) )
2022-08-10 12:46:45 -07:00
}
}
}
2022-01-10 09:07:49 -08:00
func ( z * erasureServerPools ) decommissionInBackground ( ctx context . Context , idx int ) error {
pool := z . serverPools [ idx ]
2023-10-12 15:30:42 -07:00
z . poolMetaMutex . RLock ( )
pending := z . poolMeta . PendingBuckets ( idx )
z . poolMetaMutex . RUnlock ( )
for _ , bucket := range pending {
z . poolMetaMutex . RLock ( )
isDecommissioned := z . poolMeta . isBucketDecommissioned ( idx , bucket . String ( ) )
z . poolMetaMutex . RUnlock ( )
if isDecommissioned {
2022-01-10 17:26:00 -08:00
if serverDebugLog {
console . Debugln ( "decommission: already done, moving on" , bucket )
}
2022-01-10 09:07:49 -08:00
z . poolMetaMutex . Lock ( )
2024-03-18 15:25:45 -07:00
if z . poolMeta . BucketDone ( idx , bucket ) {
// remove from pendingBuckets and persist.
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , z . poolMeta . save ( ctx , z . serverPools ) )
2024-03-18 15:25:45 -07:00
}
2022-01-10 09:07:49 -08:00
z . poolMetaMutex . Unlock ( )
continue
}
2022-01-10 17:26:00 -08:00
if serverDebugLog {
2022-07-04 14:02:54 -07:00
console . Debugln ( "decommission: currently on bucket" , bucket . Name )
2022-01-10 17:26:00 -08:00
}
2022-08-10 12:46:45 -07:00
stopFn := globalDecommissionMetrics . log ( decomMetricDecommissionBucket , idx , bucket . Name )
2022-01-10 09:07:49 -08:00
if err := z . decommissionPool ( ctx , idx , pool , bucket ) ; err != nil {
2024-06-03 08:45:54 -07:00
stopFn ( 0 , err )
2022-01-10 09:07:49 -08:00
return err
}
2024-06-03 08:45:54 -07:00
stopFn ( 0 , nil )
2022-08-10 12:46:45 -07:00
2022-01-10 09:07:49 -08:00
z . poolMetaMutex . Lock ( )
2024-03-18 15:25:45 -07:00
if z . poolMeta . BucketDone ( idx , bucket ) {
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , z . poolMeta . save ( ctx , z . serverPools ) )
2024-03-18 15:25:45 -07:00
}
2022-01-10 09:07:49 -08:00
z . poolMetaMutex . Unlock ( )
}
return nil
}
2023-01-12 01:22:51 +01:00
func ( z * erasureServerPools ) checkAfterDecom ( ctx context . Context , idx int ) error {
buckets , err := z . getBucketsToDecommission ( ctx )
if err != nil {
return err
}
pool := z . serverPools [ idx ]
for _ , set := range pool . sets {
for _ , bi := range buckets {
2024-04-22 10:49:30 -07:00
var vc * versioning . Versioning
var lc * lifecycle . Lifecycle
var lr objectlock . Retention
var rcfg * replication . Config
if bi . Name != minioMetaBucket {
vc , err = globalBucketVersioningSys . Get ( bi . Name )
if err != nil {
return err
}
2023-05-11 13:35:16 -07:00
2024-04-22 10:49:30 -07:00
// Check if the current bucket has a configured lifecycle policy
lc , err = globalLifecycleSys . Get ( bi . Name )
if err != nil && ! errors . Is ( err , BucketLifecycleNotFound { Bucket : bi . Name } ) {
return err
}
2023-05-11 13:35:16 -07:00
2024-04-22 10:49:30 -07:00
// Check if bucket is object locked.
lr , err = globalBucketObjectLockSys . Get ( bi . Name )
if err != nil {
return err
}
rcfg , err = getReplicationConfig ( ctx , bi . Name )
if err != nil {
return err
}
}
2023-05-11 13:35:16 -07:00
filterLifecycle := func ( bucket , object string , fi FileInfo ) bool {
if lc == nil {
return false
}
versioned := vc != nil && vc . Versioned ( object )
objInfo := fi . ToObjectInfo ( bucket , object , versioned )
2023-10-06 05:55:15 -07:00
evt := evalActionFromLifecycle ( ctx , * lc , lr , rcfg , objInfo )
2023-05-11 13:35:16 -07:00
switch {
case evt . Action . DeleteRestored ( ) : // if restored copy has expired,delete it synchronously
2023-05-22 15:28:56 -07:00
applyExpiryOnTransitionedObject ( ctx , z , objInfo , evt , lcEventSrc_Decom )
2023-05-11 13:35:16 -07:00
return false
case evt . Action . Delete ( ) :
2023-05-22 15:28:56 -07:00
globalExpiryState . enqueueByDays ( objInfo , evt , lcEventSrc_Decom )
2023-05-11 13:35:16 -07:00
return true
default :
return false
}
}
var versionsFound int
2024-04-22 10:49:30 -07:00
if err = set . listObjectsToDecommission ( ctx , bi , func ( entry metaCacheEntry ) {
2023-05-11 13:35:16 -07:00
if ! entry . isObject ( ) {
return
}
fivs , err := entry . fileInfoVersions ( bi . Name )
if err != nil {
return
}
// We need a reversed order for decommissioning,
// to create the appropriate stack.
versionsSorter ( fivs . Versions ) . reverse ( )
for _ , version := range fivs . Versions {
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle ( bi . Name , version . Name , version ) {
continue
}
// `.usage-cache.bin` still exists, must be not readable ignore it.
if bi . Name == minioMetaBucket && strings . Contains ( version . Name , dataUsageCacheName ) {
// skipping bucket usage cache name, as its autogenerated.
continue
}
versionsFound ++
2023-01-12 01:22:51 +01:00
}
2024-04-22 10:49:30 -07:00
} ) ; err != nil {
2023-01-12 01:22:51 +01:00
return err
}
2023-05-11 13:35:16 -07:00
if versionsFound > 0 {
return fmt . Errorf ( "at least %d object(s)/version(s) were found in bucket `%s` after decommissioning" , versionsFound , bi . Name )
2023-01-12 01:22:51 +01:00
}
}
}
return nil
}
2022-01-10 09:07:49 -08:00
func ( z * erasureServerPools ) doDecommissionInRoutine ( ctx context . Context , idx int ) {
z . poolMetaMutex . Lock ( )
var dctx context . Context
dctx , z . decommissionCancelers [ idx ] = context . WithCancel ( GlobalContext )
z . poolMetaMutex . Unlock ( )
2022-05-04 08:45:27 +01:00
// Generate an empty request info so it can be directly modified later by audit
dctx = logger . SetReqInfo ( dctx , & logger . ReqInfo { } )
2022-01-10 09:07:49 -08:00
if err := z . decommissionInBackground ( dctx , idx ) ; err != nil {
2024-04-04 13:04:40 +01:00
decomLogIf ( GlobalContext , err )
decomLogIf ( GlobalContext , z . DecommissionFailed ( dctx , idx ) )
2022-01-10 09:07:49 -08:00
return
}
2022-04-26 20:06:41 -07:00
z . poolMetaMutex . Lock ( )
2023-01-16 21:36:34 +05:30
failed := z . poolMeta . Pools [ idx ] . Decommission . ItemsDecommissionFailed > 0 || contextCanceled ( dctx )
poolCmdLine := z . poolMeta . Pools [ idx ] . CmdLine
2022-04-26 20:06:41 -07:00
z . poolMetaMutex . Unlock ( )
2023-01-12 01:22:51 +01:00
if ! failed {
2024-04-04 13:04:40 +01:00
decomLogEvent ( dctx , "Decommissioning complete for pool '%s', verifying for any pending objects" , poolCmdLine )
2023-01-12 01:22:51 +01:00
err := z . checkAfterDecom ( dctx , idx )
if err != nil {
2024-04-04 13:04:40 +01:00
decomLogIf ( ctx , err )
2023-01-12 01:22:51 +01:00
failed = true
}
}
2022-04-26 20:06:41 -07:00
if failed {
// Decommission failed indicate as such.
2024-04-04 13:04:40 +01:00
decomLogIf ( GlobalContext , z . DecommissionFailed ( dctx , idx ) )
2022-04-26 20:06:41 -07:00
} else {
// Complete the decommission..
2024-04-04 13:04:40 +01:00
decomLogIf ( GlobalContext , z . CompleteDecommission ( dctx , idx ) )
2022-04-26 20:06:41 -07:00
}
2022-01-10 09:07:49 -08:00
}
2022-01-11 12:27:47 -08:00
func ( z * erasureServerPools ) IsSuspended ( idx int ) bool {
2022-04-06 23:42:05 -07:00
z . poolMetaMutex . RLock ( )
defer z . poolMetaMutex . RUnlock ( )
2022-01-11 12:27:47 -08:00
return z . poolMeta . IsSuspended ( idx )
}
2022-01-10 09:07:49 -08:00
// Decommission - start decommission session.
2023-01-16 21:36:34 +05:30
func ( z * erasureServerPools ) Decommission ( ctx context . Context , indices ... int ) error {
if len ( indices ) == 0 {
2022-01-10 09:07:49 -08:00
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
// Make pool unwritable before decommissioning.
2023-01-16 21:36:34 +05:30
if err := z . StartDecommission ( ctx , indices ... ) ; err != nil {
2022-01-10 09:07:49 -08:00
return err
}
2023-06-23 12:29:32 -07:00
go func ( ) {
for _ , idx := range indices {
// decommission all pools serially one after
// the other.
z . doDecommissionInRoutine ( ctx , idx )
}
} ( )
2022-01-10 09:07:49 -08:00
// Successfully started decommissioning.
return nil
}
2022-01-11 18:48:43 -08:00
type decomError struct {
Err string
}
func ( d decomError ) Error ( ) string {
return d . Err
}
type poolSpaceInfo struct {
Free int64
Total int64
Used int64
}
func ( z * erasureServerPools ) getDecommissionPoolSpaceInfo ( idx int ) ( pi poolSpaceInfo , err error ) {
if idx < 0 {
return pi , errInvalidArgument
}
if idx + 1 > len ( z . serverPools ) {
return pi , errInvalidArgument
}
2022-07-15 21:03:23 -07:00
2022-12-01 14:31:35 -08:00
info := z . serverPools [ idx ] . StorageInfo ( context . Background ( ) )
2022-03-30 10:48:35 -07:00
info . Backend = z . BackendInfo ( )
2022-01-11 18:48:43 -08:00
usableTotal := int64 ( GetTotalUsableCapacity ( info . Disks , info ) )
usableFree := int64 ( GetTotalUsableCapacityFree ( info . Disks , info ) )
return poolSpaceInfo {
Total : usableTotal ,
Free : usableFree ,
Used : usableTotal - usableFree ,
} , nil
}
2022-01-10 09:07:49 -08:00
func ( z * erasureServerPools ) Status ( ctx context . Context , idx int ) ( PoolStatus , error ) {
if idx < 0 {
return PoolStatus { } , errInvalidArgument
}
2022-01-11 18:48:43 -08:00
pi , err := z . getDecommissionPoolSpaceInfo ( idx )
if err != nil {
2022-07-01 16:21:23 -07:00
return PoolStatus { } , err
2022-01-10 09:07:49 -08:00
}
2023-09-16 02:28:06 -07:00
z . poolMetaMutex . RLock ( )
defer z . poolMetaMutex . RUnlock ( )
poolInfo := z . poolMeta . Pools [ idx ] . Clone ( )
2022-01-10 09:07:49 -08:00
if poolInfo . Decommission != nil {
2022-01-11 18:48:43 -08:00
poolInfo . Decommission . TotalSize = pi . Total
2024-06-06 15:30:43 +01:00
poolInfo . Decommission . CurrentSize = pi . Free
2022-01-10 09:07:49 -08:00
} else {
poolInfo . Decommission = & PoolDecommissionInfo {
2022-01-11 18:48:43 -08:00
TotalSize : pi . Total ,
CurrentSize : pi . Free ,
2022-01-10 09:07:49 -08:00
}
}
return poolInfo , nil
}
func ( z * erasureServerPools ) ReloadPoolMeta ( ctx context . Context ) ( err error ) {
meta := poolMeta { }
2022-01-14 10:32:35 -08:00
if err = meta . load ( ctx , z . serverPools [ 0 ] , z . serverPools ) ; err != nil {
2022-01-10 09:07:49 -08:00
return err
}
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
z . poolMeta = meta
return nil
}
func ( z * erasureServerPools ) DecommissionCancel ( ctx context . Context , idx int ) ( err error ) {
if idx < 0 {
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
2023-07-10 07:55:38 -07:00
fn := z . decommissionCancelers [ idx ]
if fn == nil {
// canceling a decommission before it started return an error.
return errDecommissionNotStarted
}
defer fn ( ) // cancel any active thread.
2022-01-10 09:07:49 -08:00
if z . poolMeta . DecommissionCancel ( idx ) {
if err = z . poolMeta . save ( ctx , z . serverPools ) ; err != nil {
return err
}
globalNotificationSys . ReloadPoolMeta ( ctx )
}
2023-07-10 07:55:38 -07:00
2022-01-10 09:07:49 -08:00
return nil
}
func ( z * erasureServerPools ) DecommissionFailed ( ctx context . Context , idx int ) ( err error ) {
if idx < 0 {
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
if z . poolMeta . DecommissionFailed ( idx ) {
2022-07-07 12:31:44 -07:00
if fn := z . decommissionCancelers [ idx ] ; fn != nil {
2023-07-10 07:55:38 -07:00
defer fn ( )
} // cancel any active thread.
2022-01-10 09:07:49 -08:00
if err = z . poolMeta . save ( ctx , z . serverPools ) ; err != nil {
return err
}
globalNotificationSys . ReloadPoolMeta ( ctx )
}
return nil
}
func ( z * erasureServerPools ) CompleteDecommission ( ctx context . Context , idx int ) ( err error ) {
if idx < 0 {
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
if z . poolMeta . DecommissionComplete ( idx ) {
2022-07-07 12:31:44 -07:00
if fn := z . decommissionCancelers [ idx ] ; fn != nil {
2023-07-10 07:55:38 -07:00
defer fn ( )
} // cancel any active thread.
2022-01-10 09:07:49 -08:00
if err = z . poolMeta . save ( ctx , z . serverPools ) ; err != nil {
return err
}
globalNotificationSys . ReloadPoolMeta ( ctx )
}
return nil
}
2023-01-12 01:22:51 +01:00
func ( z * erasureServerPools ) getBucketsToDecommission ( ctx context . Context ) ( [ ] decomBucketInfo , error ) {
buckets , err := z . ListBuckets ( ctx , BucketOptions { } )
if err != nil {
return nil , err
}
decomBuckets := make ( [ ] decomBucketInfo , len ( buckets ) )
for i := range buckets {
decomBuckets [ i ] = decomBucketInfo {
Name : buckets [ i ] . Name ,
}
}
// Buckets data are dispersed in multiple zones/sets, make
// sure to decommission the necessary metadata.
2024-04-02 12:18:26 +05:30
decomMetaBuckets := [ ] decomBucketInfo {
{
Name : minioMetaBucket ,
Prefix : minioConfigPrefix ,
} ,
{
Name : minioMetaBucket ,
Prefix : bucketMetaPrefix ,
} ,
}
2023-01-12 01:22:51 +01:00
2024-04-02 12:18:26 +05:30
return append ( decomMetaBuckets , decomBuckets ... ) , nil
2023-01-12 01:22:51 +01:00
}
2023-01-16 21:36:34 +05:30
func ( z * erasureServerPools ) StartDecommission ( ctx context . Context , indices ... int ) ( err error ) {
if len ( indices ) == 0 {
2022-01-10 09:07:49 -08:00
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
2023-01-12 01:22:51 +01:00
decomBuckets , err := z . getBucketsToDecommission ( ctx )
2022-01-10 09:07:49 -08:00
if err != nil {
return err
}
2023-01-12 01:22:51 +01:00
for _ , bucket := range decomBuckets {
2022-07-15 21:03:23 -07:00
z . HealBucket ( ctx , bucket . Name , madmin . HealOpts { } )
}
2023-05-09 16:37:31 -07:00
// Create .minio.sys/config, .minio.sys/buckets paths if missing,
2022-03-07 16:18:57 -08:00
// this code is present to avoid any missing meta buckets on other
// pools.
for _ , metaBucket := range [ ] string {
pathJoin ( minioMetaBucket , minioConfigPrefix ) ,
pathJoin ( minioMetaBucket , bucketMetaPrefix ) ,
} {
2022-03-14 11:25:24 -07:00
var bucketExists BucketExists
2022-12-23 07:46:00 -08:00
if err = z . MakeBucket ( ctx , metaBucket , MakeBucketOptions { } ) ; err != nil {
2022-03-14 11:25:24 -07:00
if ! errors . As ( err , & bucketExists ) {
return err
}
2022-03-07 16:18:57 -08:00
}
}
2023-01-16 21:36:34 +05:30
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
for _ , idx := range indices {
pi , err := z . getDecommissionPoolSpaceInfo ( idx )
if err != nil {
return err
2022-01-10 09:07:49 -08:00
}
2023-01-16 21:36:34 +05:30
if err = z . poolMeta . Decommission ( idx , pi ) ; err != nil {
return err
}
2022-01-10 09:07:49 -08:00
2023-01-16 21:36:34 +05:30
z . poolMeta . QueueBuckets ( idx , decomBuckets )
2022-01-10 09:07:49 -08:00
}
if err = z . poolMeta . save ( ctx , z . serverPools ) ; err != nil {
return err
}
2023-01-16 21:36:34 +05:30
2022-01-10 09:07:49 -08:00
globalNotificationSys . ReloadPoolMeta ( ctx )
2023-01-16 21:36:34 +05:30
2022-01-10 09:07:49 -08:00
return nil
}
2022-05-04 08:45:27 +01:00
func auditLogDecom ( ctx context . Context , apiName , bucket , object , versionID string , err error ) {
errStr := ""
if err != nil {
errStr = err . Error ( )
}
2022-10-24 19:35:07 +01:00
auditLogInternal ( ctx , AuditLogOptions {
2022-07-12 10:43:32 -07:00
Event : "decommission" ,
2022-05-04 08:45:27 +01:00
APIName : apiName ,
2022-10-24 19:35:07 +01:00
Bucket : bucket ,
Object : object ,
2022-05-04 08:45:27 +01:00
VersionID : versionID ,
Error : errStr ,
} )
}