From 7fd76dbbb71eeba0dd1d7c16e7d96ec1a9deba52 Mon Sep 17 00:00:00 2001 From: Poorna Date: Thu, 21 Mar 2024 16:13:43 -0700 Subject: [PATCH] fix batch snowball to close channel after listing finishes (#19316) panic seen due to premature closing of slow channel while listing is still sending or list has already closed on the sender's side: ``` panic: close of closed channel goroutine 13666 [running]: github.com/minio/minio/internal/ioutil.SafeClose[...](0x101ff51e4?) /Users/kp/code/src/github.com/minio/minio/internal/ioutil/ioutil.go:425 +0x24 github.com/minio/minio/cmd.(*erasureServerPools).Walk.func1() /Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:2142 +0x170 created by github.com/minio/minio/cmd.(*erasureServerPools).Walk in goroutine 1189 /Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:1985 +0x228 ``` --- cmd/batch-handlers.go | 46 +++++++++++++++++-------------------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 7f62ecc59..68317b7b1 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -1064,8 +1064,6 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { go func() { - defer xioutil.SafeClose(slowCh) - // Snowball currently needs the high level minio-go Client, not the Core one cl, err := miniogo.New(u.Host, &miniogo.Options{ Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), @@ -1081,31 +1079,8 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba // Already validated before arriving here smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan) - var ( - obj = ObjectInfo{} - batch = make([]ObjectInfo, 0, *r.Source.Snowball.Batch) - valid = true - ) - - for valid { - obj, valid = <-walkCh - - if !valid { - goto write - } - - if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) { - slowCh <- obj - continue - } - - batch = append(batch, obj) - - if len(batch) < *r.Source.Snowball.Batch { - continue - } - - write: + batch := make([]ObjectInfo, 0, *r.Source.Snowball.Batch) + writeFn := func(batch []ObjectInfo) { if len(batch) > 0 { if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { logger.LogIf(ctx, err) @@ -1118,9 +1093,24 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba // persist in-memory state to disk after every 10secs. logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) } - batch = batch[:0] } } + for obj := range walkCh { + if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) { + slowCh <- obj + continue + } + + batch = append(batch, obj) + + if len(batch) < *r.Source.Snowball.Batch { + continue + } + writeFn(batch) + batch = batch[:0] + } + writeFn(batch) + xioutil.SafeClose(slowCh) }() } else { slowCh = walkCh