2022-01-10 12:07:49 -05:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"encoding/binary"
"errors"
"fmt"
2022-07-07 15:31:44 -04:00
"math/rand"
2022-01-10 12:07:49 -05:00
"net/http"
"sort"
2022-04-08 02:19:13 -04:00
"strconv"
2022-04-28 19:27:53 -04:00
"strings"
2022-04-08 02:19:13 -04:00
"sync"
2022-01-10 12:07:49 -05:00
"time"
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
2022-01-10 20:26:00 -05:00
"github.com/minio/pkg/console"
2022-04-08 02:19:13 -04:00
"github.com/minio/pkg/env"
2022-01-10 12:07:49 -05:00
)
// PoolDecommissionInfo currently decommissioning information
type PoolDecommissionInfo struct {
StartTime time . Time ` json:"startTime" msg:"st" `
StartSize int64 ` json:"startSize" msg:"ss" `
TotalSize int64 ` json:"totalSize" msg:"ts" `
CurrentSize int64 ` json:"currentSize" msg:"cs" `
Complete bool ` json:"complete" msg:"cmp" `
Failed bool ` json:"failed" msg:"fl" `
Canceled bool ` json:"canceled" msg:"cnl" `
// Internal information.
QueuedBuckets [ ] string ` json:"-" msg:"bkts" `
DecommissionedBuckets [ ] string ` json:"-" msg:"dbkts" `
// Last bucket/object decommissioned.
Bucket string ` json:"-" msg:"bkt" `
2022-07-04 17:02:54 -04:00
// Captures prefix that is currently being
// decommissioned inside the 'Bucket'
Prefix string ` json:"-" msg:"pfx" `
2022-01-10 12:07:49 -05:00
Object string ` json:"-" msg:"obj" `
// Verbose information
2022-01-11 21:48:43 -05:00
ItemsDecommissioned int64 ` json:"-" msg:"id" `
ItemsDecommissionFailed int64 ` json:"-" msg:"idf" `
BytesDone int64 ` json:"-" msg:"bd" `
BytesFailed int64 ` json:"-" msg:"bf" `
2022-01-10 12:07:49 -05:00
}
// bucketPop should be called when a bucket is done decommissioning.
// Adds the bucket to the list of decommissioned buckets and updates resume numbers.
func ( pd * PoolDecommissionInfo ) bucketPop ( bucket string ) {
pd . DecommissionedBuckets = append ( pd . DecommissionedBuckets , bucket )
for i , b := range pd . QueuedBuckets {
if b == bucket {
// Bucket is done.
pd . QueuedBuckets = append ( pd . QueuedBuckets [ : i ] , pd . QueuedBuckets [ i + 1 : ] ... )
2022-01-10 20:26:00 -05:00
// Clear tracker info.
if pd . Bucket == bucket {
pd . Bucket = "" // empty this out for next bucket
2022-07-04 17:02:54 -04:00
pd . Prefix = "" // empty this out for the next bucket
2022-01-10 20:26:00 -05:00
pd . Object = "" // empty this out for next object
}
return
2022-01-10 12:07:49 -05:00
}
}
}
func ( pd * PoolDecommissionInfo ) isBucketDecommissioned ( bucket string ) bool {
for _ , b := range pd . DecommissionedBuckets {
if b == bucket {
return true
}
}
return false
}
2022-07-04 17:02:54 -04:00
func ( pd * PoolDecommissionInfo ) bucketPush ( bucket decomBucketInfo ) {
2022-01-10 12:07:49 -05:00
for _ , b := range pd . QueuedBuckets {
if pd . isBucketDecommissioned ( b ) {
return
}
2022-07-04 17:02:54 -04:00
if b == bucket . String ( ) {
2022-01-10 12:07:49 -05:00
return
}
}
2022-07-04 17:02:54 -04:00
pd . QueuedBuckets = append ( pd . QueuedBuckets , bucket . String ( ) )
pd . Bucket = bucket . Name
pd . Prefix = bucket . Prefix
2022-01-10 12:07:49 -05:00
}
// PoolStatus captures current pool status
type PoolStatus struct {
ID int ` json:"id" msg:"id" `
CmdLine string ` json:"cmdline" msg:"cl" `
LastUpdate time . Time ` json:"lastUpdate" msg:"lu" `
Decommission * PoolDecommissionInfo ` json:"decommissionInfo,omitempty" msg:"dec" `
}
//go:generate msgp -file $GOFILE -unexported
type poolMeta struct {
Version int ` msg:"v" `
Pools [ ] PoolStatus ` msg:"pls" `
}
// A decommission resumable tells us if decommission is worth
// resuming upon restart of a cluster.
func ( p * poolMeta ) returnResumablePools ( n int ) [ ] PoolStatus {
var newPools [ ] PoolStatus
for _ , pool := range p . Pools {
if pool . Decommission == nil {
continue
}
if pool . Decommission . Complete || pool . Decommission . Canceled {
// Do not resume decommission upon startup for
// - decommission complete
// - decommission canceled
continue
} // In all other situations we need to resume
newPools = append ( newPools , pool )
if n > 0 && len ( newPools ) == n {
return newPools
}
}
return nil
}
func ( p * poolMeta ) DecommissionComplete ( idx int ) bool {
2022-04-07 02:42:05 -04:00
if p . Pools [ idx ] . Decommission != nil && ! p . Pools [ idx ] . Decommission . Complete {
2022-01-11 21:48:43 -05:00
p . Pools [ idx ] . LastUpdate = UTCNow ( )
2022-01-10 12:07:49 -05:00
p . Pools [ idx ] . Decommission . Complete = true
p . Pools [ idx ] . Decommission . Failed = false
p . Pools [ idx ] . Decommission . Canceled = false
return true
}
return false
}
func ( p * poolMeta ) DecommissionFailed ( idx int ) bool {
2022-04-07 02:42:05 -04:00
if p . Pools [ idx ] . Decommission != nil && ! p . Pools [ idx ] . Decommission . Failed {
2022-01-11 21:48:43 -05:00
p . Pools [ idx ] . LastUpdate = UTCNow ( )
2022-01-10 12:07:49 -05:00
p . Pools [ idx ] . Decommission . StartTime = time . Time { }
p . Pools [ idx ] . Decommission . Complete = false
p . Pools [ idx ] . Decommission . Failed = true
p . Pools [ idx ] . Decommission . Canceled = false
return true
}
return false
}
func ( p * poolMeta ) DecommissionCancel ( idx int ) bool {
2022-04-07 02:42:05 -04:00
if p . Pools [ idx ] . Decommission != nil && ! p . Pools [ idx ] . Decommission . Canceled {
2022-01-11 21:48:43 -05:00
p . Pools [ idx ] . LastUpdate = UTCNow ( )
2022-01-10 12:07:49 -05:00
p . Pools [ idx ] . Decommission . StartTime = time . Time { }
p . Pools [ idx ] . Decommission . Complete = false
p . Pools [ idx ] . Decommission . Failed = false
p . Pools [ idx ] . Decommission . Canceled = true
return true
}
return false
}
func ( p poolMeta ) isBucketDecommissioned ( idx int , bucket string ) bool {
return p . Pools [ idx ] . Decommission . isBucketDecommissioned ( bucket )
}
2022-07-04 17:02:54 -04:00
func ( p * poolMeta ) BucketDone ( idx int , bucket decomBucketInfo ) {
2022-01-10 12:07:49 -05:00
if p . Pools [ idx ] . Decommission == nil {
// Decommission not in progress.
return
}
2022-07-04 17:02:54 -04:00
p . Pools [ idx ] . Decommission . bucketPop ( bucket . String ( ) )
2022-01-10 12:07:49 -05:00
}
func ( p poolMeta ) ResumeBucketObject ( idx int ) ( bucket , object string ) {
if p . Pools [ idx ] . Decommission != nil {
bucket = p . Pools [ idx ] . Decommission . Bucket
object = p . Pools [ idx ] . Decommission . Object
}
return
}
func ( p * poolMeta ) TrackCurrentBucketObject ( idx int , bucket string , object string ) {
if p . Pools [ idx ] . Decommission == nil {
// Decommission not in progress.
return
}
p . Pools [ idx ] . Decommission . Bucket = bucket
p . Pools [ idx ] . Decommission . Object = object
}
2022-07-04 17:02:54 -04:00
func ( p * poolMeta ) PendingBuckets ( idx int ) [ ] decomBucketInfo {
2022-01-10 12:07:49 -05:00
if p . Pools [ idx ] . Decommission == nil {
// Decommission not in progress.
return nil
}
2022-07-04 17:02:54 -04:00
decomBuckets := make ( [ ] decomBucketInfo , len ( p . Pools [ idx ] . Decommission . QueuedBuckets ) )
for i := range decomBuckets {
bucket , prefix := path2BucketObject ( p . Pools [ idx ] . Decommission . QueuedBuckets [ i ] )
decomBuckets [ i ] = decomBucketInfo {
Name : bucket ,
Prefix : prefix ,
}
}
return decomBuckets
}
//msgp:ignore decomBucketInfo
type decomBucketInfo struct {
Name string
Prefix string
}
func ( db decomBucketInfo ) String ( ) string {
return pathJoin ( db . Name , db . Prefix )
2022-01-10 12:07:49 -05:00
}
2022-07-04 17:02:54 -04:00
func ( p * poolMeta ) QueueBuckets ( idx int , buckets [ ] decomBucketInfo ) {
2022-01-10 12:07:49 -05:00
// add new queued buckets
for _ , bucket := range buckets {
2022-07-04 17:02:54 -04:00
p . Pools [ idx ] . Decommission . bucketPush ( bucket )
2022-01-10 12:07:49 -05:00
}
}
var (
errDecommissionAlreadyRunning = errors . New ( "decommission is already in progress" )
errDecommissionComplete = errors . New ( "decommission is complete, please remove the servers from command-line" )
)
2022-01-11 21:48:43 -05:00
func ( p * poolMeta ) Decommission ( idx int , pi poolSpaceInfo ) error {
2022-01-10 12:07:49 -05:00
for i , pool := range p . Pools {
if idx == i {
continue
}
if pool . Decommission != nil {
// Do not allow multiple decommissions at the same time.
// We shall for now only allow one pool decommission at
// a time.
return fmt . Errorf ( "%w at index: %d" , errDecommissionAlreadyRunning , i )
}
}
2022-01-11 21:48:43 -05:00
2022-05-03 16:36:08 -04:00
// Return an error when there is decommission on going - the user needs
// to explicitly cancel it first in order to restart decommissioning again.
if p . Pools [ idx ] . Decommission != nil &&
! p . Pools [ idx ] . Decommission . Complete &&
! p . Pools [ idx ] . Decommission . Failed &&
! p . Pools [ idx ] . Decommission . Canceled {
return errDecommissionAlreadyRunning
2022-01-10 12:07:49 -05:00
}
2022-05-03 16:36:08 -04:00
now := UTCNow ( )
p . Pools [ idx ] . LastUpdate = now
p . Pools [ idx ] . Decommission = & PoolDecommissionInfo {
StartTime : now ,
StartSize : pi . Free ,
CurrentSize : pi . Free ,
TotalSize : pi . Total ,
2022-01-10 12:07:49 -05:00
}
2022-05-03 16:36:08 -04:00
return nil
2022-01-10 12:07:49 -05:00
}
func ( p poolMeta ) IsSuspended ( idx int ) bool {
return p . Pools [ idx ] . Decommission != nil
}
2022-01-14 13:32:35 -05:00
func ( p * poolMeta ) validate ( pools [ ] * erasureSets ) ( bool , error ) {
2022-01-10 12:07:49 -05:00
type poolInfo struct {
2022-01-14 13:32:35 -05:00
position int
completed bool
decomStarted bool // started but not finished yet
2022-01-10 12:07:49 -05:00
}
rememberedPools := make ( map [ string ] poolInfo )
for idx , pool := range p . Pools {
complete := false
2022-01-14 13:32:35 -05:00
decomStarted := false
if pool . Decommission != nil {
if pool . Decommission . Complete {
complete = true
}
decomStarted = true
2022-01-10 12:07:49 -05:00
}
rememberedPools [ pool . CmdLine ] = poolInfo {
2022-01-14 13:32:35 -05:00
position : idx ,
completed : complete ,
decomStarted : decomStarted ,
2022-01-10 12:07:49 -05:00
}
}
specifiedPools := make ( map [ string ] int )
for idx , pool := range pools {
specifiedPools [ pool . endpoints . CmdLine ] = idx
}
2022-04-28 19:27:53 -04:00
replaceScheme := func ( k string ) string {
// This is needed as fallback when users are changeing
// from http->https or https->http, we need to verify
// both because MinIO remembers the command-line in
// "exact" order - as long as this order is not disturbed
// we allow changing the "scheme" i.e internode communication
// from plain-text to TLS or from TLS to plain-text.
if strings . HasPrefix ( k , "http://" ) {
k = strings . ReplaceAll ( k , "http://" , "https://" )
} else if strings . HasPrefix ( k , "https://" ) {
k = strings . ReplaceAll ( k , "https://" , "http://" )
}
return k
}
var update bool
2022-01-10 12:07:49 -05:00
// Check if specified pools need to remove decommissioned pool.
for k := range specifiedPools {
pi , ok := rememberedPools [ k ]
2022-04-28 19:27:53 -04:00
if ! ok {
pi , ok = rememberedPools [ replaceScheme ( k ) ]
if ok {
update = true // Looks like user is changing from http->https or https->http
}
}
2022-01-10 12:07:49 -05:00
if ok && pi . completed {
return false , fmt . Errorf ( "pool(%s) = %s is decommissioned, please remove from server command line" , humanize . Ordinal ( pi . position + 1 ) , k )
}
}
// check if remembered pools are in right position or missing from command line.
for k , pi := range rememberedPools {
if pi . completed {
continue
}
_ , ok := specifiedPools [ k ]
2022-04-28 19:27:53 -04:00
if ! ok {
_ , ok = specifiedPools [ replaceScheme ( k ) ]
if ok {
update = true // Looks like user is changing from http->https or https->http
}
}
2022-01-10 12:07:49 -05:00
if ! ok {
return false , fmt . Errorf ( "pool(%s) = %s is not specified, please specify on server command line" , humanize . Ordinal ( pi . position + 1 ) , k )
}
}
// check when remembered pools and specified pools are same they are at the expected position
if len ( rememberedPools ) == len ( specifiedPools ) {
for k , pi := range rememberedPools {
pos , ok := specifiedPools [ k ]
2022-04-28 19:27:53 -04:00
if ! ok {
pos , ok = specifiedPools [ replaceScheme ( k ) ]
if ok {
update = true // Looks like user is changing from http->https or https->http
}
}
2022-01-10 12:07:49 -05:00
if ! ok {
return false , fmt . Errorf ( "pool(%s) = %s is not specified, please specify on server command line" , humanize . Ordinal ( pi . position + 1 ) , k )
}
if pos != pi . position {
return false , fmt . Errorf ( "pool order change detected for %s, expected position is (%s) but found (%s)" , k , humanize . Ordinal ( pi . position + 1 ) , humanize . Ordinal ( pos + 1 ) )
}
}
}
2022-04-28 19:27:53 -04:00
if ! update {
update = len ( rememberedPools ) != len ( specifiedPools )
}
2022-01-14 13:32:35 -05:00
if update {
for k , pi := range rememberedPools {
if pi . decomStarted && ! pi . completed {
return false , fmt . Errorf ( "pool(%s) = %s is being decommissioned, No changes should be made to the command line arguments. Please complete the decommission in progress" , humanize . Ordinal ( pi . position + 1 ) , k )
}
}
}
return update , nil
}
func ( p * poolMeta ) load ( ctx context . Context , pool * erasureSets , pools [ ] * erasureSets ) error {
data , err := readConfig ( ctx , pool , poolMetaName )
if err != nil {
if errors . Is ( err , errConfigNotFound ) || isErrObjectNotFound ( err ) {
return nil
}
return err
}
if len ( data ) == 0 {
// Seems to be empty create a new poolMeta object.
return nil
}
if len ( data ) <= 4 {
return fmt . Errorf ( "poolMeta: no data" )
}
// Read header
switch binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) {
case poolMetaFormat :
default :
return fmt . Errorf ( "poolMeta: unknown format: %d" , binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) )
}
switch binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) {
case poolMetaVersion :
default :
return fmt . Errorf ( "poolMeta: unknown version: %d" , binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) )
}
// OK, parse data.
if _ , err = p . UnmarshalMsg ( data [ 4 : ] ) ; err != nil {
return err
}
switch p . Version {
case poolMetaVersionV1 :
default :
return fmt . Errorf ( "unexpected pool meta version: %d" , p . Version )
}
return nil
2022-01-10 12:07:49 -05:00
}
func ( p * poolMeta ) CountItem ( idx int , size int64 , failed bool ) {
pd := p . Pools [ idx ] . Decommission
if pd != nil {
if failed {
pd . ItemsDecommissionFailed ++
2022-01-11 21:48:43 -05:00
pd . BytesFailed += size
2022-01-10 12:07:49 -05:00
} else {
pd . ItemsDecommissioned ++
2022-01-11 21:48:43 -05:00
pd . BytesDone += size
2022-01-10 12:07:49 -05:00
}
p . Pools [ idx ] . Decommission = pd
}
}
2022-01-11 21:48:43 -05:00
func ( p * poolMeta ) updateAfter ( ctx context . Context , idx int , pools [ ] * erasureSets , duration time . Duration ) ( bool , error ) {
2022-01-10 12:07:49 -05:00
if p . Pools [ idx ] . Decommission == nil {
2022-01-11 21:48:43 -05:00
return false , errInvalidArgument
2022-01-10 12:07:49 -05:00
}
2022-01-11 21:48:43 -05:00
now := UTCNow ( )
if now . Sub ( p . Pools [ idx ] . LastUpdate ) >= duration {
if serverDebugLog {
console . Debugf ( "decommission: persisting poolMeta on disk: threshold:%s, poolMeta:%#v\n" , now . Sub ( p . Pools [ idx ] . LastUpdate ) , p . Pools [ idx ] )
}
p . Pools [ idx ] . LastUpdate = now
if err := p . save ( ctx , pools ) ; err != nil {
return false , err
}
return true , nil
2022-01-10 12:07:49 -05:00
}
2022-01-11 21:48:43 -05:00
return false , nil
2022-01-10 12:07:49 -05:00
}
func ( p poolMeta ) save ( ctx context . Context , pools [ ] * erasureSets ) error {
data := make ( [ ] byte , 4 , p . Msgsize ( ) + 4 )
// Initialize the header.
binary . LittleEndian . PutUint16 ( data [ 0 : 2 ] , poolMetaFormat )
binary . LittleEndian . PutUint16 ( data [ 2 : 4 ] , poolMetaVersion )
buf , err := p . MarshalMsg ( data )
if err != nil {
return err
}
// Saves on all pools to make sure decommissioning of first pool is allowed.
for _ , eset := range pools {
if err = saveConfig ( ctx , eset , poolMetaName , buf ) ; err != nil {
return err
}
}
return nil
}
const (
poolMetaName = "pool.bin"
poolMetaFormat = 1
poolMetaVersionV1 = 1
poolMetaVersion = poolMetaVersionV1
)
// Init() initializes pools and saves additional information about them
// in 'pool.bin', this is eventually used for decommissioning the pool.
func ( z * erasureServerPools ) Init ( ctx context . Context ) error {
meta := poolMeta { }
2022-01-14 13:32:35 -05:00
if err := meta . load ( ctx , z . serverPools [ 0 ] , z . serverPools ) ; err != nil {
return err
}
update , err := meta . validate ( z . serverPools )
2022-01-10 12:07:49 -05:00
if err != nil {
return err
}
// if no update is needed return right away.
if ! update {
2022-04-07 02:42:05 -04:00
z . poolMeta = meta
2022-01-10 12:07:49 -05:00
// We are only supporting single pool decommission at this time
// so it makes sense to only resume single pools at any given
// time, in future meta.returnResumablePools() might take
// '-1' as argument to decommission multiple pools at a time
// but this is not a priority at the moment.
for _ , pool := range meta . returnResumablePools ( 1 ) {
2022-04-07 02:42:05 -04:00
idx := globalEndpoints . GetPoolIdx ( pool . CmdLine )
if idx == - 1 {
return fmt . Errorf ( "unexpected state present for decommission status pool(%s) not found" , pool . CmdLine )
}
if globalEndpoints [ idx ] . Endpoints [ 0 ] . IsLocal {
go func ( pool PoolStatus ) {
2022-07-07 15:31:44 -04:00
r := rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) )
for {
if err := z . Decommission ( ctx , pool . ID ) ; err != nil {
switch err {
// we already started decommission
case errDecommissionAlreadyRunning :
// A previous decommission running found restart it.
z . doDecommissionInRoutine ( ctx , idx )
return
default :
if configRetriableErrors ( err ) {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to resume decommission of pool %v: %w: retrying.." , pool , err ) )
time . Sleep ( time . Second + time . Duration ( r . Float64 ( ) * float64 ( 5 * time . Second ) ) )
continue
}
logger . LogIf ( ctx , fmt . Errorf ( "Unable to resume decommission of pool %v: %w" , pool , err ) )
return
}
}
break
2022-04-07 02:42:05 -04:00
}
} ( pool )
}
2022-01-10 12:07:49 -05:00
}
return nil
}
2022-01-11 21:48:43 -05:00
meta = poolMeta { } // to update write poolMeta fresh.
2022-01-10 12:07:49 -05:00
// looks like new pool was added we need to update,
// or this is a fresh installation (or an existing
// installation with pool removed)
meta . Version = poolMetaVersion
for idx , pool := range z . serverPools {
meta . Pools = append ( meta . Pools , PoolStatus {
CmdLine : pool . endpoints . CmdLine ,
ID : idx ,
2022-01-11 21:48:43 -05:00
LastUpdate : UTCNow ( ) ,
2022-01-10 12:07:49 -05:00
} )
}
if err = meta . save ( ctx , z . serverPools ) ; err != nil {
return err
}
z . poolMeta = meta
return nil
}
func ( z * erasureServerPools ) decommissionObject ( ctx context . Context , bucket string , gr * GetObjectReader ) ( err error ) {
objInfo := gr . ObjInfo
2022-05-04 03:45:27 -04:00
defer func ( ) {
gr . Close ( )
auditLogDecom ( ctx , "DecomCopyData" , objInfo . Bucket , objInfo . Name , objInfo . VersionID , err )
} ( )
2022-01-10 12:07:49 -05:00
if objInfo . isMultipart ( ) {
2022-01-10 20:26:00 -05:00
uploadID , err := z . NewMultipartUpload ( ctx , bucket , objInfo . Name , ObjectOptions {
VersionID : objInfo . VersionID ,
MTime : objInfo . ModTime ,
UserDefined : objInfo . UserDefined ,
} )
2022-01-10 12:07:49 -05:00
if err != nil {
return err
}
defer z . AbortMultipartUpload ( ctx , bucket , objInfo . Name , uploadID , ObjectOptions { } )
2022-01-10 20:26:00 -05:00
parts := make ( [ ] CompletePart , len ( objInfo . Parts ) )
for i , part := range objInfo . Parts {
2022-01-10 12:07:49 -05:00
hr , err := hash . NewReader ( gr , part . Size , "" , "" , part . Size )
if err != nil {
return err
}
2022-01-10 20:26:00 -05:00
pi , err := z . PutObjectPart ( ctx , bucket , objInfo . Name , uploadID ,
2022-01-10 12:07:49 -05:00
part . Number ,
NewPutObjReader ( hr ) ,
ObjectOptions { } )
if err != nil {
return err
}
2022-01-10 20:26:00 -05:00
parts [ i ] = CompletePart {
ETag : pi . ETag ,
PartNumber : pi . PartNumber ,
}
2022-01-10 12:07:49 -05:00
}
_ , err = z . CompleteMultipartUpload ( ctx , bucket , objInfo . Name , uploadID , parts , ObjectOptions {
MTime : objInfo . ModTime ,
} )
return err
}
hr , err := hash . NewReader ( gr , objInfo . Size , "" , "" , objInfo . Size )
if err != nil {
return err
}
_ , err = z . PutObject ( ctx ,
bucket ,
objInfo . Name ,
NewPutObjReader ( hr ) ,
ObjectOptions {
VersionID : objInfo . VersionID ,
MTime : objInfo . ModTime ,
UserDefined : objInfo . UserDefined ,
} )
return err
}
// versionsSorter sorts FileInfo slices by version.
//msgp:ignore versionsSorter
type versionsSorter [ ] FileInfo
func ( v versionsSorter ) reverse ( ) {
sort . Slice ( v , func ( i , j int ) bool {
return v [ i ] . ModTime . Before ( v [ j ] . ModTime )
} )
}
2022-07-04 17:02:54 -04:00
func ( z * erasureServerPools ) decommissionPool ( ctx context . Context , idx int , pool * erasureSets , bi decomBucketInfo ) error {
2022-05-04 03:45:27 -04:00
ctx = logger . SetReqInfo ( ctx , & logger . ReqInfo { } )
2022-04-08 02:19:13 -04:00
var wg sync . WaitGroup
wStr := env . Get ( "_MINIO_DECOMMISSION_WORKERS" , strconv . Itoa ( len ( pool . sets ) ) )
2022-04-12 13:49:53 -04:00
workerSize , err := strconv . Atoi ( wStr )
if err != nil {
return err
}
2022-04-08 02:19:13 -04:00
parallelWorkers := make ( chan struct { } , workerSize )
2022-01-10 12:07:49 -05:00
for _ , set := range pool . sets {
2022-04-08 02:19:13 -04:00
set := set
2022-01-10 12:07:49 -05:00
disks := set . getOnlineDisks ( )
if len ( disks ) == 0 {
logger . LogIf ( GlobalContext , fmt . Errorf ( "no online disks found for set with endpoints %s" ,
set . getEndpoints ( ) ) )
continue
}
2022-07-04 17:02:54 -04:00
vc , _ := globalBucketVersioningSys . Get ( bi . Name )
2022-01-10 12:07:49 -05:00
decommissionEntry := func ( entry metaCacheEntry ) {
2022-04-12 13:49:53 -04:00
defer func ( ) {
<- parallelWorkers
2022-04-18 16:26:29 -04:00
wg . Done ( )
2022-04-12 13:49:53 -04:00
} ( )
2022-04-26 23:06:41 -04:00
2022-01-10 12:07:49 -05:00
if entry . isDir ( ) {
return
}
2022-07-04 17:02:54 -04:00
fivs , err := entry . fileInfoVersions ( bi . Name )
2022-01-10 12:07:49 -05:00
if err != nil {
return
}
2022-03-07 19:18:57 -05:00
// We need a reversed order for decommissioning,
2022-01-10 12:07:49 -05:00
// to create the appropriate stack.
versionsSorter ( fivs . Versions ) . reverse ( )
2022-04-26 23:06:41 -04:00
var decommissionedCount int
2022-01-10 12:07:49 -05:00
for _ , version := range fivs . Versions {
// TODO: Skip transitioned objects for now.
if version . IsRemote ( ) {
2022-07-04 17:02:54 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "found %s/%s transitioned object, transitioned object won't be decommissioned" , bi . Name , version . Name ) )
2022-01-10 12:07:49 -05:00
continue
}
// We will skip decommissioning delete markers
// with single version, its as good as there
// is no data associated with the object.
if version . Deleted && len ( fivs . Versions ) == 1 {
2022-07-04 17:02:54 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "found %s/%s delete marked object with no other versions, skipping since there is no content left" , bi . Name , version . Name ) )
2022-01-10 12:07:49 -05:00
continue
}
if version . Deleted {
_ , err := z . DeleteObject ( ctx ,
2022-07-04 17:02:54 -04:00
bi . Name ,
2022-01-10 12:07:49 -05:00
version . Name ,
ObjectOptions {
2022-05-06 22:05:28 -04:00
Versioned : vc . PrefixEnabled ( version . Name ) ,
2022-01-10 12:07:49 -05:00
VersionID : version . VersionID ,
MTime : version . ModTime ,
DeleteReplication : version . ReplicationState ,
2022-07-05 10:37:24 -04:00
DeleteMarker : true , // make sure we create a delete marker
2022-01-10 12:07:49 -05:00
} )
2022-04-26 23:06:41 -04:00
var failure bool
2022-01-10 12:07:49 -05:00
if err != nil {
logger . LogIf ( ctx , err )
2022-04-26 23:06:41 -04:00
failure = true
}
z . poolMetaMutex . Lock ( )
z . poolMeta . CountItem ( idx , 0 , failure )
z . poolMetaMutex . Unlock ( )
2022-07-05 10:37:24 -04:00
if ! failure {
// Success keep a count.
decommissionedCount ++
2022-01-10 12:07:49 -05:00
}
continue
}
2022-04-26 23:06:41 -04:00
var failure bool
2022-01-10 12:07:49 -05:00
// gr.Close() is ensured by decommissionObject().
2022-05-11 14:37:32 -04:00
for try := 0 ; try < 3 ; try ++ {
gr , err := set . GetObjectNInfo ( ctx ,
2022-07-04 17:02:54 -04:00
bi . Name ,
2022-05-11 14:37:32 -04:00
encodeDirObject ( version . Name ) ,
nil ,
http . Header { } ,
noLock , // all mutations are blocked reads are safe without locks.
ObjectOptions {
VersionID : version . VersionID ,
} )
2022-05-18 20:58:19 -04:00
if isErrObjectNotFound ( err ) {
// object deleted by the application, nothing to do here we move on.
return
}
2022-05-11 14:37:32 -04:00
if err != nil {
failure = true
logger . LogIf ( ctx , err )
continue
}
2022-07-04 17:02:54 -04:00
if err = z . decommissionObject ( ctx , bi . Name , gr ) ; err != nil {
2022-05-11 14:37:32 -04:00
failure = true
logger . LogIf ( ctx , err )
continue
}
failure = false
break
2022-04-26 23:06:41 -04:00
}
z . poolMetaMutex . Lock ( )
z . poolMeta . CountItem ( idx , version . Size , failure )
z . poolMetaMutex . Unlock ( )
if failure {
break // break out on first error
2022-01-10 12:07:49 -05:00
}
2022-04-26 23:06:41 -04:00
decommissionedCount ++
}
// if all versions were decommissioned, then we can delete the object versions.
if decommissionedCount == len ( fivs . Versions ) {
2022-05-04 03:45:27 -04:00
_ , err := set . DeleteObject ( ctx ,
2022-07-04 17:02:54 -04:00
bi . Name ,
2022-05-18 20:58:19 -04:00
encodeDirObject ( entry . name ) ,
2022-01-10 12:07:49 -05:00
ObjectOptions {
2022-04-26 23:06:41 -04:00
DeletePrefix : true , // use prefix delete to delete all versions at once.
} ,
)
2022-07-04 17:02:54 -04:00
auditLogDecom ( ctx , "DecomDeleteObject" , bi . Name , entry . name , "" , err )
2022-05-04 03:45:27 -04:00
if err != nil {
logger . LogIf ( ctx , err )
}
2022-01-10 12:07:49 -05:00
}
z . poolMetaMutex . Lock ( )
2022-07-04 17:02:54 -04:00
z . poolMeta . TrackCurrentBucketObject ( idx , bi . Name , entry . name )
2022-01-11 21:48:43 -05:00
ok , err := z . poolMeta . updateAfter ( ctx , idx , z . serverPools , 30 * time . Second )
logger . LogIf ( ctx , err )
if ok {
globalNotificationSys . ReloadPoolMeta ( ctx )
}
2022-01-10 12:07:49 -05:00
z . poolMetaMutex . Unlock ( )
}
// How to resolve partial results.
resolver := metadataResolutionParams {
2022-01-27 20:00:15 -05:00
dirQuorum : len ( disks ) / 2 , // make sure to capture all quorum ratios
objQuorum : len ( disks ) / 2 , // make sure to capture all quorum ratios
2022-07-04 17:02:54 -04:00
bucket : bi . Name ,
2022-01-10 12:07:49 -05:00
}
2022-04-08 02:19:13 -04:00
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
err := listPathRaw ( ctx , listPathRawOptions {
disks : disks ,
2022-07-04 17:02:54 -04:00
bucket : bi . Name ,
path : bi . Prefix ,
2022-04-08 02:19:13 -04:00
recursive : true ,
forwardTo : "" ,
minDisks : len ( disks ) / 2 , // to capture all quorum ratios
reportNotFound : false ,
2022-04-12 13:49:53 -04:00
agreed : func ( entry metaCacheEntry ) {
parallelWorkers <- struct { } { }
2022-04-18 16:26:29 -04:00
wg . Add ( 1 )
2022-04-12 13:49:53 -04:00
go decommissionEntry ( entry )
} ,
2022-07-07 16:45:34 -04:00
partial : func ( entries metaCacheEntries , _ [ ] error ) {
2022-04-08 02:19:13 -04:00
entry , ok := entries . resolve ( & resolver )
if ok {
2022-04-12 13:49:53 -04:00
parallelWorkers <- struct { } { }
2022-04-18 16:26:29 -04:00
wg . Add ( 1 )
2022-04-12 13:49:53 -04:00
go decommissionEntry ( * entry )
2022-04-08 02:19:13 -04:00
}
} ,
finished : nil ,
} )
logger . LogIf ( ctx , err )
} ( )
2022-01-10 12:07:49 -05:00
}
2022-04-08 02:19:13 -04:00
wg . Wait ( )
2022-01-10 12:07:49 -05:00
return nil
}
func ( z * erasureServerPools ) decommissionInBackground ( ctx context . Context , idx int ) error {
pool := z . serverPools [ idx ]
for _ , bucket := range z . poolMeta . PendingBuckets ( idx ) {
2022-07-04 17:02:54 -04:00
if z . poolMeta . isBucketDecommissioned ( idx , bucket . String ( ) ) {
2022-01-10 20:26:00 -05:00
if serverDebugLog {
console . Debugln ( "decommission: already done, moving on" , bucket )
}
2022-01-10 12:07:49 -05:00
z . poolMetaMutex . Lock ( )
z . poolMeta . BucketDone ( idx , bucket ) // remove from pendingBuckets and persist.
z . poolMeta . save ( ctx , z . serverPools )
z . poolMetaMutex . Unlock ( )
continue
}
2022-01-10 20:26:00 -05:00
if serverDebugLog {
2022-07-04 17:02:54 -04:00
console . Debugln ( "decommission: currently on bucket" , bucket . Name )
2022-01-10 20:26:00 -05:00
}
2022-01-10 12:07:49 -05:00
if err := z . decommissionPool ( ctx , idx , pool , bucket ) ; err != nil {
return err
}
z . poolMetaMutex . Lock ( )
z . poolMeta . BucketDone ( idx , bucket )
z . poolMeta . save ( ctx , z . serverPools )
z . poolMetaMutex . Unlock ( )
}
return nil
}
func ( z * erasureServerPools ) doDecommissionInRoutine ( ctx context . Context , idx int ) {
z . poolMetaMutex . Lock ( )
var dctx context . Context
dctx , z . decommissionCancelers [ idx ] = context . WithCancel ( GlobalContext )
z . poolMetaMutex . Unlock ( )
2022-05-04 03:45:27 -04:00
// Generate an empty request info so it can be directly modified later by audit
dctx = logger . SetReqInfo ( dctx , & logger . ReqInfo { } )
2022-01-10 12:07:49 -05:00
if err := z . decommissionInBackground ( dctx , idx ) ; err != nil {
logger . LogIf ( GlobalContext , err )
logger . LogIf ( GlobalContext , z . DecommissionFailed ( dctx , idx ) )
return
}
2022-04-26 23:06:41 -04:00
z . poolMetaMutex . Lock ( )
failed := z . poolMeta . Pools [ idx ] . Decommission . ItemsDecommissionFailed > 0
z . poolMetaMutex . Unlock ( )
if failed {
// Decommission failed indicate as such.
logger . LogIf ( GlobalContext , z . DecommissionFailed ( dctx , idx ) )
} else {
// Complete the decommission..
logger . LogIf ( GlobalContext , z . CompleteDecommission ( dctx , idx ) )
}
2022-01-10 12:07:49 -05:00
}
2022-01-11 15:27:47 -05:00
func ( z * erasureServerPools ) IsSuspended ( idx int ) bool {
2022-04-07 02:42:05 -04:00
z . poolMetaMutex . RLock ( )
defer z . poolMetaMutex . RUnlock ( )
2022-01-11 15:27:47 -05:00
return z . poolMeta . IsSuspended ( idx )
}
2022-01-10 12:07:49 -05:00
// Decommission - start decommission session.
func ( z * erasureServerPools ) Decommission ( ctx context . Context , idx int ) error {
if idx < 0 {
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
// Make pool unwritable before decommissioning.
if err := z . StartDecommission ( ctx , idx ) ; err != nil {
return err
}
go z . doDecommissionInRoutine ( ctx , idx )
// Successfully started decommissioning.
return nil
}
2022-01-11 21:48:43 -05:00
type decomError struct {
Err string
}
func ( d decomError ) Error ( ) string {
return d . Err
}
type poolSpaceInfo struct {
Free int64
Total int64
Used int64
}
func ( z * erasureServerPools ) getDecommissionPoolSpaceInfo ( idx int ) ( pi poolSpaceInfo , err error ) {
if idx < 0 {
return pi , errInvalidArgument
}
if idx + 1 > len ( z . serverPools ) {
return pi , errInvalidArgument
}
2022-03-30 01:51:31 -04:00
info , _ := z . serverPools [ idx ] . StorageInfo ( context . Background ( ) )
2022-03-30 13:48:35 -04:00
info . Backend = z . BackendInfo ( )
2022-01-11 21:48:43 -05:00
for _ , disk := range info . Disks {
if disk . Healing {
return pi , decomError {
Err : fmt . Sprintf ( "%s drive is healing, decommission will not be started" , disk . Endpoint ) ,
}
}
}
usableTotal := int64 ( GetTotalUsableCapacity ( info . Disks , info ) )
usableFree := int64 ( GetTotalUsableCapacityFree ( info . Disks , info ) )
return poolSpaceInfo {
Total : usableTotal ,
Free : usableFree ,
Used : usableTotal - usableFree ,
} , nil
}
2022-01-10 12:07:49 -05:00
func ( z * erasureServerPools ) Status ( ctx context . Context , idx int ) ( PoolStatus , error ) {
if idx < 0 {
return PoolStatus { } , errInvalidArgument
}
z . poolMetaMutex . RLock ( )
defer z . poolMetaMutex . RUnlock ( )
2022-01-11 21:48:43 -05:00
pi , err := z . getDecommissionPoolSpaceInfo ( idx )
if err != nil {
2022-07-01 19:21:23 -04:00
return PoolStatus { } , err
2022-01-10 12:07:49 -05:00
}
poolInfo := z . poolMeta . Pools [ idx ]
if poolInfo . Decommission != nil {
2022-01-11 21:48:43 -05:00
poolInfo . Decommission . TotalSize = pi . Total
poolInfo . Decommission . CurrentSize = poolInfo . Decommission . StartSize + poolInfo . Decommission . BytesDone
2022-01-10 12:07:49 -05:00
} else {
poolInfo . Decommission = & PoolDecommissionInfo {
2022-01-11 21:48:43 -05:00
TotalSize : pi . Total ,
CurrentSize : pi . Free ,
2022-01-10 12:07:49 -05:00
}
}
return poolInfo , nil
}
func ( z * erasureServerPools ) ReloadPoolMeta ( ctx context . Context ) ( err error ) {
meta := poolMeta { }
2022-01-14 13:32:35 -05:00
if err = meta . load ( ctx , z . serverPools [ 0 ] , z . serverPools ) ; err != nil {
2022-01-10 12:07:49 -05:00
return err
}
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
z . poolMeta = meta
return nil
}
func ( z * erasureServerPools ) DecommissionCancel ( ctx context . Context , idx int ) ( err error ) {
if idx < 0 {
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
if z . poolMeta . DecommissionCancel ( idx ) {
2022-07-07 15:31:44 -04:00
if fn := z . decommissionCancelers [ idx ] ; fn != nil {
defer fn ( ) // cancel any active thread.
}
2022-01-10 12:07:49 -05:00
if err = z . poolMeta . save ( ctx , z . serverPools ) ; err != nil {
return err
}
globalNotificationSys . ReloadPoolMeta ( ctx )
}
return nil
}
func ( z * erasureServerPools ) DecommissionFailed ( ctx context . Context , idx int ) ( err error ) {
if idx < 0 {
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
if z . poolMeta . DecommissionFailed ( idx ) {
2022-07-07 15:31:44 -04:00
if fn := z . decommissionCancelers [ idx ] ; fn != nil {
defer fn ( ) // cancel any active thread.
}
2022-01-10 12:07:49 -05:00
if err = z . poolMeta . save ( ctx , z . serverPools ) ; err != nil {
return err
}
globalNotificationSys . ReloadPoolMeta ( ctx )
}
return nil
}
func ( z * erasureServerPools ) CompleteDecommission ( ctx context . Context , idx int ) ( err error ) {
if idx < 0 {
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
if z . poolMeta . DecommissionComplete ( idx ) {
2022-07-07 15:31:44 -04:00
if fn := z . decommissionCancelers [ idx ] ; fn != nil {
defer fn ( ) // cancel any active thread.
}
2022-01-10 12:07:49 -05:00
if err = z . poolMeta . save ( ctx , z . serverPools ) ; err != nil {
return err
}
globalNotificationSys . ReloadPoolMeta ( ctx )
}
return nil
}
func ( z * erasureServerPools ) StartDecommission ( ctx context . Context , idx int ) ( err error ) {
if idx < 0 {
return errInvalidArgument
}
if z . SinglePool ( ) {
return errInvalidArgument
}
buckets , err := z . ListBuckets ( ctx )
if err != nil {
return err
}
2022-07-04 17:02:54 -04:00
decomBuckets := make ( [ ] decomBucketInfo , len ( buckets ) )
for i := range buckets {
decomBuckets [ i ] = decomBucketInfo {
Name : buckets [ i ] . Name ,
}
}
2022-01-10 12:07:49 -05:00
// TODO: Support decommissioning transition tiers.
2022-07-04 17:02:54 -04:00
for _ , bucket := range decomBuckets {
2022-01-10 12:07:49 -05:00
if lc , err := globalLifecycleSys . Get ( bucket . Name ) ; err == nil {
if lc . HasTransition ( ) {
2022-01-11 21:48:43 -05:00
return decomError {
Err : fmt . Sprintf ( "Bucket is part of transitioned tier %s: decommission is not allowed in Tier'd setups" , bucket . Name ) ,
}
2022-01-10 12:07:49 -05:00
}
}
}
2022-03-07 19:18:57 -05:00
// Create .minio.sys/conifg, .minio.sys/buckets paths if missing,
// this code is present to avoid any missing meta buckets on other
// pools.
for _ , metaBucket := range [ ] string {
pathJoin ( minioMetaBucket , minioConfigPrefix ) ,
pathJoin ( minioMetaBucket , bucketMetaPrefix ) ,
} {
2022-03-14 14:25:24 -04:00
var bucketExists BucketExists
2022-03-07 19:18:57 -05:00
if err = z . MakeBucketWithLocation ( ctx , metaBucket , BucketOptions { } ) ; err != nil {
2022-03-14 14:25:24 -04:00
if ! errors . As ( err , & bucketExists ) {
return err
}
2022-03-07 19:18:57 -05:00
}
}
2022-01-10 12:07:49 -05:00
// Buckets data are dispersed in multiple zones/sets, make
// sure to decommission the necessary metadata.
2022-07-04 17:02:54 -04:00
decomBuckets = append ( decomBuckets , decomBucketInfo {
Name : minioMetaBucket ,
Prefix : minioConfigPrefix ,
2022-01-10 20:26:00 -05:00
} )
2022-07-04 17:02:54 -04:00
decomBuckets = append ( decomBuckets , decomBucketInfo {
Name : minioMetaBucket ,
Prefix : bucketMetaPrefix ,
2022-01-10 12:07:49 -05:00
} )
var pool * erasureSets
for pidx := range z . serverPools {
if pidx == idx {
pool = z . serverPools [ idx ]
break
}
}
if pool == nil {
return errInvalidArgument
}
2022-01-11 21:48:43 -05:00
pi , err := z . getDecommissionPoolSpaceInfo ( idx )
if err != nil {
return err
2022-01-10 12:07:49 -05:00
}
z . poolMetaMutex . Lock ( )
defer z . poolMetaMutex . Unlock ( )
2022-01-11 21:48:43 -05:00
if err = z . poolMeta . Decommission ( idx , pi ) ; err != nil {
2022-01-10 12:07:49 -05:00
return err
}
2022-07-04 17:02:54 -04:00
z . poolMeta . QueueBuckets ( idx , decomBuckets )
2022-01-10 12:07:49 -05:00
if err = z . poolMeta . save ( ctx , z . serverPools ) ; err != nil {
return err
}
globalNotificationSys . ReloadPoolMeta ( ctx )
return nil
}
2022-05-04 03:45:27 -04:00
func auditLogDecom ( ctx context . Context , apiName , bucket , object , versionID string , err error ) {
errStr := ""
if err != nil {
errStr = err . Error ( )
}
auditLogInternal ( ctx , bucket , object , AuditLogOptions {
2022-07-12 13:43:32 -04:00
Event : "decommission" ,
2022-05-04 03:45:27 -04:00
APIName : apiName ,
VersionID : versionID ,
Error : errStr ,
} )
}