Handle failures in pool rebalancing (#19623)

This commit is contained in:
Praveen raj Mani 2024-04-27 00:59:28 +05:30 committed by GitHub
parent 4caa3422bd
commit 410a1ac040
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -376,7 +376,7 @@ func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
} }
func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) { func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
doneCh := make(chan struct{}) doneCh := make(chan error, 1)
defer xioutil.SafeClose(doneCh) defer xioutil.SafeClose(doneCh)
// Save rebalance.bin periodically. // Save rebalance.bin periodically.
@ -391,48 +391,50 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int)
timer := time.NewTimer(randSleepFor()) timer := time.NewTimer(randSleepFor())
defer timer.Stop() defer timer.Stop()
var rebalDone bool
var traceMsg string var (
quit bool
traceMsg string
)
for { for {
select { select {
case <-doneCh: case rebalErr := <-doneCh:
// rebalance completed for poolIdx quit = true
now := time.Now() now := time.Now()
var status rebalStatus
switch {
case errors.Is(rebalErr, context.Canceled):
status = rebalStopped
traceMsg = fmt.Sprintf("stopped at %s", now)
case rebalErr == nil:
status = rebalCompleted
traceMsg = fmt.Sprintf("completed at %s", now)
default:
status = rebalFailed
traceMsg = fmt.Sprintf("stopped at %s with err: %v", now, rebalErr)
}
z.rebalMu.Lock() z.rebalMu.Lock()
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalCompleted z.rebalMeta.PoolStats[poolIdx].Info.Status = status
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
z.rebalMu.Unlock() z.rebalMu.Unlock()
rebalDone = true
traceMsg = fmt.Sprintf("completed at %s", now)
case <-ctx.Done():
// rebalance stopped for poolIdx
now := time.Now()
z.rebalMu.Lock()
z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalStopped
z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
z.rebalMeta.cancel = nil // remove the already used context.CancelFunc
z.rebalMu.Unlock()
rebalDone = true
traceMsg = fmt.Sprintf("stopped at %s", now)
case <-timer.C: case <-timer.C:
traceMsg = fmt.Sprintf("saved at %s", time.Now()) traceMsg = fmt.Sprintf("saved at %s", time.Now())
} }
stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg) stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
err := z.saveRebalanceStats(ctx, poolIdx, rebalSaveStats) err := z.saveRebalanceStats(GlobalContext, poolIdx, rebalSaveStats)
stopFn(err) stopFn(err)
rebalanceLogIf(ctx, err) rebalanceLogIf(GlobalContext, err)
timer.Reset(randSleepFor())
if rebalDone { if quit {
return return
} }
timer.Reset(randSleepFor())
} }
}() }()
@ -441,6 +443,7 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
doneCh <- ctx.Err()
return return
default: default:
} }
@ -457,14 +460,15 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int)
if errors.Is(err, errServerNotInitialized) || errors.Is(err, errBucketMetadataNotInitialized) { if errors.Is(err, errServerNotInitialized) || errors.Is(err, errBucketMetadataNotInitialized) {
continue continue
} }
rebalanceLogIf(ctx, err) rebalanceLogIf(GlobalContext, err)
doneCh <- err
return return
} }
stopFn(nil) stopFn(nil)
z.bucketRebalanceDone(bucket, poolIdx) z.bucketRebalanceDone(bucket, poolIdx)
} }
rebalanceLogEvent(ctx, "Pool %d rebalancing is done", poolIdx+1) rebalanceLogEvent(GlobalContext, "Pool %d rebalancing is done", poolIdx+1)
return err return err
} }