fix: ignore drained pool in Healing, hold lock additionally (#14080)

This commit is contained in:
Harshavardhana 2022-01-11 12:27:47 -08:00 committed by GitHub
parent 3d7c1ad31d
commit 404b05a44c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 4 deletions

View File

@ -727,6 +727,12 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in
logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx)) logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx))
} }
func (z *erasureServerPools) IsSuspended(idx int) bool {
z.poolMetaMutex.Lock()
defer z.poolMetaMutex.Unlock()
return z.poolMeta.IsSuspended(idx)
}
// Decommission - start decommission session. // Decommission - start decommission session.
func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error { func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error {
if idx < 0 { if idx < 0 {

View File

@ -272,7 +272,7 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
for index := range z.serverPools { for index := range z.serverPools {
index := index index := index
// skip suspended pools for any new I/O. // skip suspended pools for any new I/O.
if z.poolMeta.IsSuspended(index) { if z.IsSuspended(index) {
continue continue
} }
g.Go(func() error { g.Go(func() error {
@ -355,7 +355,7 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
} }
// skip all objects from suspended pools for mutating calls. // skip all objects from suspended pools for mutating calls.
if z.poolMeta.IsSuspended(pinfo.PoolIndex) && opts.Mutate { if z.IsSuspended(pinfo.PoolIndex) && opts.Mutate {
continue continue
} }
@ -886,7 +886,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error { func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error {
for idx, zone := range z.serverPools { for idx, zone := range z.serverPools {
if z.poolMeta.IsSuspended(idx) { if z.IsSuspended(idx) {
logger.LogIf(ctx, fmt.Errorf("pool %d is suspended, all writes are suspended", idx+1)) logger.LogIf(ctx, fmt.Errorf("pool %d is suspended, all writes are suspended", idx+1))
continue continue
} }
@ -1828,7 +1828,10 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
defer close(errCh) defer close(errCh)
var wg sync.WaitGroup var wg sync.WaitGroup
for _, erasureSet := range z.serverPools { for idx, erasureSet := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
for _, set := range erasureSet.sets { for _, set := range erasureSet.sets {
wg.Add(1) wg.Add(1)
go func(set *erasureObjects) { go func(set *erasureObjects) {
@ -1850,6 +1853,9 @@ func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, ver
results := make([]madmin.HealResultItem, len(z.serverPools)) results := make([]madmin.HealResultItem, len(z.serverPools))
var wg sync.WaitGroup var wg sync.WaitGroup
for idx, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
wg.Add(1) wg.Add(1)
go func(idx int, pool *erasureSets) { go func(idx int, pool *erasureSets) {
defer wg.Done() defer wg.Done()