feat: allow decom of multiple pools (#16416)

This commit is contained in:
Harshavardhana 2023-01-16 21:36:34 +05:30 committed by GitHub
parent beb1924437
commit 095fc0561d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 80 deletions

View File

@ -22,6 +22,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"strings"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
@ -49,28 +50,53 @@ func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Reque
return return
} }
pools, ok := objectAPI.(*erasureServerPools) z, ok := objectAPI.(*erasureServerPools)
if !ok { if !ok || len(z.serverPools) == 1 {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return return
} }
if pools.IsRebalanceStarted() { if z.IsDecommissionRunning() {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errDecommissionRebalanceAlreadyRunning), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errDecommissionAlreadyRunning), r.URL)
return
}
if z.IsRebalanceStarted() {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminRebalanceAlreadyStarted), r.URL)
return return
} }
vars := mux.Vars(r) vars := mux.Vars(r)
v := vars["pool"] v := vars["pool"]
idx := globalEndpoints.GetPoolIdx(v) pools := strings.Split(v, ",")
poolIndices := make([]int, 0, len(pools))
for _, pool := range pools {
idx := globalEndpoints.GetPoolIdx(pool)
if idx == -1 { if idx == -1 {
// We didn't find any matching pools, invalid input // We didn't find any matching pools, invalid input
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL)
return return
} }
var pool *erasureSets
for pidx := range z.serverPools {
if pidx == idx {
pool = z.serverPools[idx]
break
}
}
if pool == nil {
// We didn't find any matching pools, invalid input
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL)
return
}
if ep := globalEndpoints[idx].Endpoints[0]; !ep.IsLocal { poolIndices = append(poolIndices, idx)
}
if len(poolIndices) > 0 && globalEndpoints[poolIndices[0]].Endpoints[0].IsLocal {
ep := globalEndpoints[poolIndices[0]].Endpoints[0]
for nodeIdx, proxyEp := range globalProxyEndpoints { for nodeIdx, proxyEp := range globalProxyEndpoints {
if proxyEp.Endpoint.Host == ep.Host { if proxyEp.Endpoint.Host == ep.Host {
if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) { if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
@ -80,7 +106,7 @@ func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Reque
} }
} }
if err := pools.Decommission(r.Context(), idx); err != nil { if err := z.Decommission(r.Context(), poolIndices...); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return return
} }

View File

@ -126,7 +126,7 @@ type poolMeta struct {
// A decommission resumable tells us if decommission is worth // A decommission resumable tells us if decommission is worth
// resuming upon restart of a cluster. // resuming upon restart of a cluster.
func (p *poolMeta) returnResumablePools(n int) []PoolStatus { func (p *poolMeta) returnResumablePools() []PoolStatus {
var newPools []PoolStatus var newPools []PoolStatus
for _, pool := range p.Pools { for _, pool := range p.Pools {
if pool.Decommission == nil { if pool.Decommission == nil {
@ -139,11 +139,8 @@ func (p *poolMeta) returnResumablePools(n int) []PoolStatus {
continue continue
} // In all other situations we need to resume } // In all other situations we need to resume
newPools = append(newPools, pool) newPools = append(newPools, pool)
if n > 0 && len(newPools) == n { }
return newPools return newPools
}
}
return nil
} }
func (p *poolMeta) DecommissionComplete(idx int) bool { func (p *poolMeta) DecommissionComplete(idx int) bool {
@ -251,18 +248,6 @@ var (
) )
func (p *poolMeta) Decommission(idx int, pi poolSpaceInfo) error { func (p *poolMeta) Decommission(idx int, pi poolSpaceInfo) error {
for i, pool := range p.Pools {
if idx == i {
continue
}
if pool.Decommission != nil {
// Do not allow multiple decommissions at the same time.
// We shall for now only allow one pool decommission at
// a time.
return fmt.Errorf("%w at index: %d", errDecommissionAlreadyRunning, i)
}
}
// Return an error when there is decommission on going - the user needs // Return an error when there is decommission on going - the user needs
// to explicitly cancel it first in order to restart decommissioning again. // to explicitly cancel it first in order to restart decommissioning again.
if p.Pools[idx].Decommission != nil && if p.Pools[idx].Decommission != nil &&
@ -510,7 +495,6 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
z.StartRebalance() z.StartRebalance()
meta := poolMeta{} meta := poolMeta{}
if err := meta.load(ctx, z.serverPools[0], z.serverPools); err != nil { if err := meta.load(ctx, z.serverPools[0], z.serverPools); err != nil {
return err return err
} }
@ -524,38 +508,38 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
if !update { if !update {
z.poolMeta = meta z.poolMeta = meta
// We are only supporting single pool decommission at this time pools := meta.returnResumablePools()
// so it makes sense to only resume single pools at any given poolIndices := make([]int, 0, len(pools))
// time, in future meta.returnResumablePools() might take for _, pool := range pools {
// '-1' as argument to decommission multiple pools at a time
// but this is not a priority at the moment.
for _, pool := range meta.returnResumablePools(1) {
idx := globalEndpoints.GetPoolIdx(pool.CmdLine) idx := globalEndpoints.GetPoolIdx(pool.CmdLine)
if idx == -1 { if idx == -1 {
return fmt.Errorf("unexpected state present for decommission status pool(%s) not found", pool.CmdLine) return fmt.Errorf("unexpected state present for decommission status pool(%s) not found", pool.CmdLine)
} }
if globalEndpoints[idx].Endpoints[0].IsLocal { poolIndices = append(poolIndices, idx)
go func(pool PoolStatus) { }
if len(poolIndices) > 0 && globalEndpoints[poolIndices[0]].Endpoints[0].IsLocal {
go func() {
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
for { for {
if err := z.Decommission(ctx, pool.ID); err != nil { if err := z.Decommission(ctx, poolIndices...); err != nil {
if errors.Is(err, errDecommissionAlreadyRunning) { if errors.Is(err, errDecommissionAlreadyRunning) {
// A previous decommission running found restart it. // A previous decommission running found restart it.
for _, idx := range poolIndices {
z.doDecommissionInRoutine(ctx, idx) z.doDecommissionInRoutine(ctx, idx)
}
return return
} }
if configRetriableErrors(err) { if configRetriableErrors(err) {
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w: retrying..", pool, err)) logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pools %v: %w: retrying..", pools, err))
time.Sleep(time.Second + time.Duration(r.Float64()*float64(5*time.Second))) time.Sleep(time.Second + time.Duration(r.Float64()*float64(5*time.Second)))
continue continue
} }
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pool, err)) logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pools, err))
return return
} }
break
}
}(pool)
} }
}()
} }
return nil return nil
@ -1039,11 +1023,12 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in
} }
z.poolMetaMutex.Lock() z.poolMetaMutex.Lock()
failed := z.poolMeta.Pools[idx].Decommission.ItemsDecommissionFailed > 0 failed := z.poolMeta.Pools[idx].Decommission.ItemsDecommissionFailed > 0 || contextCanceled(dctx)
poolCmdLine := z.poolMeta.Pools[idx].CmdLine
z.poolMetaMutex.Unlock() z.poolMetaMutex.Unlock()
if !failed { if !failed {
logger.Info("Decommissioning almost complete - checking for left over objects") logger.Info("Decommissioning complete for pool '%s', verifying for any pending objects", poolCmdLine)
err := z.checkAfterDecom(dctx, idx) err := z.checkAfterDecom(dctx, idx)
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
@ -1067,8 +1052,8 @@ func (z *erasureServerPools) IsSuspended(idx int) bool {
} }
// Decommission - start decommission session. // Decommission - start decommission session.
func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error { func (z *erasureServerPools) Decommission(ctx context.Context, indices ...int) error {
if idx < 0 { if len(indices) == 0 {
return errInvalidArgument return errInvalidArgument
} }
@ -1077,11 +1062,15 @@ func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error {
} }
// Make pool unwritable before decommissioning. // Make pool unwritable before decommissioning.
if err := z.StartDecommission(ctx, idx); err != nil { if err := z.StartDecommission(ctx, indices...); err != nil {
return err return err
} }
go z.doDecommissionInRoutine(ctx, idx) go func() {
for _, idx := range indices {
z.doDecommissionInRoutine(ctx, idx)
}
}()
// Successfully started decommissioning. // Successfully started decommissioning.
return nil return nil
@ -1260,8 +1249,8 @@ func (z *erasureServerPools) getBucketsToDecommission(ctx context.Context) ([]de
return decomBuckets, nil return decomBuckets, nil
} }
func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (err error) { func (z *erasureServerPools) StartDecommission(ctx context.Context, indices ...int) (err error) {
if idx < 0 { if len(indices) == 0 {
return errInvalidArgument return errInvalidArgument
} }
@ -1304,34 +1293,28 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er
} }
} }
var pool *erasureSets z.poolMetaMutex.Lock()
for pidx := range z.serverPools { defer z.poolMetaMutex.Unlock()
if pidx == idx {
pool = z.serverPools[idx]
break
}
}
if pool == nil {
return errInvalidArgument
}
for _, idx := range indices {
pi, err := z.getDecommissionPoolSpaceInfo(idx) pi, err := z.getDecommissionPoolSpaceInfo(idx)
if err != nil { if err != nil {
return err return err
} }
z.poolMetaMutex.Lock()
defer z.poolMetaMutex.Unlock()
if err = z.poolMeta.Decommission(idx, pi); err != nil { if err = z.poolMeta.Decommission(idx, pi); err != nil {
return err return err
} }
z.poolMeta.QueueBuckets(idx, decomBuckets) z.poolMeta.QueueBuckets(idx, decomBuckets)
}
if err = z.poolMeta.save(ctx, z.serverPools); err != nil { if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
return err return err
} }
globalNotificationSys.ReloadPoolMeta(ctx) globalNotificationSys.ReloadPoolMeta(ctx)
return nil return nil
} }