resume/start decommission on the first node of the pool under decommission (#14705)

Additionally fixes

- IsSuspended() can use read locks
- Avoid double cancels panic on canceler
This commit is contained in:
Harshavardhana 2022-04-06 23:42:05 -07:00 committed by GitHub
parent a9eef521ec
commit ee49a23220
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 16 deletions

View File

@ -58,6 +58,16 @@ func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Reque
return return
} }
if ep := globalEndpoints[idx].Endpoints[0]; !ep.IsLocal {
for nodeIdx, proxyEp := range globalProxyEndpoints {
if proxyEp.Endpoint.Host == ep.Host {
if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
return
}
}
}
}
if err := pools.Decommission(r.Context(), idx); err != nil { if err := pools.Decommission(r.Context(), idx); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return return
@ -96,6 +106,16 @@ func (a adminAPIHandlers) CancelDecommission(w http.ResponseWriter, r *http.Requ
return return
} }
if ep := globalEndpoints[idx].Endpoints[0]; !ep.IsLocal {
for nodeIdx, proxyEp := range globalProxyEndpoints {
if proxyEp.Endpoint.Host == ep.Host {
if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
return
}
}
}
}
if err := pools.DecommissionCancel(ctx, idx); err != nil { if err := pools.DecommissionCancel(ctx, idx); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return return

View File

@ -141,7 +141,7 @@ func (p *poolMeta) returnResumablePools(n int) []PoolStatus {
} }
func (p *poolMeta) DecommissionComplete(idx int) bool { func (p *poolMeta) DecommissionComplete(idx int) bool {
if p.Pools[idx].Decommission != nil { if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.Complete {
p.Pools[idx].LastUpdate = UTCNow() p.Pools[idx].LastUpdate = UTCNow()
p.Pools[idx].Decommission.Complete = true p.Pools[idx].Decommission.Complete = true
p.Pools[idx].Decommission.Failed = false p.Pools[idx].Decommission.Failed = false
@ -152,7 +152,7 @@ func (p *poolMeta) DecommissionComplete(idx int) bool {
} }
func (p *poolMeta) DecommissionFailed(idx int) bool { func (p *poolMeta) DecommissionFailed(idx int) bool {
if p.Pools[idx].Decommission != nil { if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.Failed {
p.Pools[idx].LastUpdate = UTCNow() p.Pools[idx].LastUpdate = UTCNow()
p.Pools[idx].Decommission.StartTime = time.Time{} p.Pools[idx].Decommission.StartTime = time.Time{}
p.Pools[idx].Decommission.Complete = false p.Pools[idx].Decommission.Complete = false
@ -164,7 +164,7 @@ func (p *poolMeta) DecommissionFailed(idx int) bool {
} }
func (p *poolMeta) DecommissionCancel(idx int) bool { func (p *poolMeta) DecommissionCancel(idx int) bool {
if p.Pools[idx].Decommission != nil { if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.Canceled {
p.Pools[idx].LastUpdate = UTCNow() p.Pools[idx].LastUpdate = UTCNow()
p.Pools[idx].Decommission.StartTime = time.Time{} p.Pools[idx].Decommission.StartTime = time.Time{}
p.Pools[idx].Decommission.Complete = false p.Pools[idx].Decommission.Complete = false
@ -466,24 +466,32 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
// if no update is needed return right away. // if no update is needed return right away.
if !update { if !update {
z.poolMeta = meta
// We are only supporting single pool decommission at this time // We are only supporting single pool decommission at this time
// so it makes sense to only resume single pools at any given // so it makes sense to only resume single pools at any given
// time, in future meta.returnResumablePools() might take // time, in future meta.returnResumablePools() might take
// '-1' as argument to decommission multiple pools at a time // '-1' as argument to decommission multiple pools at a time
// but this is not a priority at the moment. // but this is not a priority at the moment.
for _, pool := range meta.returnResumablePools(1) { for _, pool := range meta.returnResumablePools(1) {
idx := globalEndpoints.GetPoolIdx(pool.CmdLine)
if idx == -1 {
return fmt.Errorf("unexpected state present for decommission status pool(%s) not found", pool.CmdLine)
}
if globalEndpoints[idx].Endpoints[0].IsLocal {
go func(pool PoolStatus) { go func(pool PoolStatus) {
switch err := z.Decommission(ctx, pool.ID); err { switch err := z.Decommission(ctx, pool.ID); err {
case errDecommissionAlreadyRunning:
fallthrough
case nil: case nil:
z.doDecommissionInRoutine(ctx, pool.ID) // we already started decommission
case errDecommissionAlreadyRunning:
// A previous decommission running found restart it.
z.doDecommissionInRoutine(ctx, idx)
default: default:
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pool, err)) logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pool, err))
} }
}(pool) }(pool)
} }
z.poolMeta = meta }
return nil return nil
} }
@ -757,8 +765,8 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in
} }
func (z *erasureServerPools) IsSuspended(idx int) bool { func (z *erasureServerPools) IsSuspended(idx int) bool {
z.poolMetaMutex.Lock() z.poolMetaMutex.RLock()
defer z.poolMetaMutex.Unlock() defer z.poolMetaMutex.RUnlock()
return z.poolMeta.IsSuspended(idx) return z.poolMeta.IsSuspended(idx)
} }
@ -920,6 +928,7 @@ func (z *erasureServerPools) CompleteDecommission(ctx context.Context, idx int)
defer z.poolMetaMutex.Unlock() defer z.poolMetaMutex.Unlock()
if z.poolMeta.DecommissionComplete(idx) { if z.poolMeta.DecommissionComplete(idx) {
z.decommissionCancelers[idx]() // cancel any active thread.
if err = z.poolMeta.save(ctx, z.serverPools); err != nil { if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
return err return err
} }