retry and resume decom operation upon retriable failures (#15244)

it is possible in a k8s-like system reading pool.bin
might not have quorum during startup, however, add
a way to retry after this failure.
This commit is contained in:
Harshavardhana 2022-07-07 12:31:44 -07:00 committed by GitHub
parent c1901f4e12
commit 5802df4365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -22,6 +22,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"math/rand"
"net/http" "net/http"
"sort" "sort"
"strconv" "strconv"
@ -523,14 +524,26 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
} }
if globalEndpoints[idx].Endpoints[0].IsLocal { if globalEndpoints[idx].Endpoints[0].IsLocal {
go func(pool PoolStatus) { go func(pool PoolStatus) {
switch err := z.Decommission(ctx, pool.ID); err { r := rand.New(rand.NewSource(time.Now().UnixNano()))
case nil: for {
if err := z.Decommission(ctx, pool.ID); err != nil {
switch err {
// we already started decommission // we already started decommission
case errDecommissionAlreadyRunning: case errDecommissionAlreadyRunning:
// A previous decommission running found restart it. // A previous decommission running found restart it.
z.doDecommissionInRoutine(ctx, idx) z.doDecommissionInRoutine(ctx, idx)
return
default: default:
if configRetriableErrors(err) {
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w: retrying..", pool, err))
time.Sleep(time.Second + time.Duration(r.Float64()*float64(5*time.Second)))
continue
}
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pool, err)) logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pool, err))
return
}
}
break
} }
}(pool) }(pool)
} }
@ -984,7 +997,9 @@ func (z *erasureServerPools) DecommissionCancel(ctx context.Context, idx int) (e
defer z.poolMetaMutex.Unlock() defer z.poolMetaMutex.Unlock()
if z.poolMeta.DecommissionCancel(idx) { if z.poolMeta.DecommissionCancel(idx) {
defer z.decommissionCancelers[idx]() // cancel any active thread. if fn := z.decommissionCancelers[idx]; fn != nil {
defer fn() // cancel any active thread.
}
if err = z.poolMeta.save(ctx, z.serverPools); err != nil { if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
return err return err
} }
@ -1006,7 +1021,9 @@ func (z *erasureServerPools) DecommissionFailed(ctx context.Context, idx int) (e
defer z.poolMetaMutex.Unlock() defer z.poolMetaMutex.Unlock()
if z.poolMeta.DecommissionFailed(idx) { if z.poolMeta.DecommissionFailed(idx) {
defer z.decommissionCancelers[idx]() // cancel any active thread. if fn := z.decommissionCancelers[idx]; fn != nil {
defer fn() // cancel any active thread.
}
if err = z.poolMeta.save(ctx, z.serverPools); err != nil { if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
return err return err
} }
@ -1028,7 +1045,9 @@ func (z *erasureServerPools) CompleteDecommission(ctx context.Context, idx int)
defer z.poolMetaMutex.Unlock() defer z.poolMetaMutex.Unlock()
if z.poolMeta.DecommissionComplete(idx) { if z.poolMeta.DecommissionComplete(idx) {
defer z.decommissionCancelers[idx]() // cancel any active thread. if fn := z.decommissionCancelers[idx]; fn != nil {
defer fn() // cancel any active thread.
}
if err = z.poolMeta.save(ctx, z.serverPools); err != nil { if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
return err return err
} }