diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 7f62ecc59..68317b7b1 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -1064,8 +1064,6 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { go func() { - defer xioutil.SafeClose(slowCh) - // Snowball currently needs the high level minio-go Client, not the Core one cl, err := miniogo.New(u.Host, &miniogo.Options{ Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), @@ -1081,31 +1079,8 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba // Already validated before arriving here smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan) - var ( - obj = ObjectInfo{} - batch = make([]ObjectInfo, 0, *r.Source.Snowball.Batch) - valid = true - ) - - for valid { - obj, valid = <-walkCh - - if !valid { - goto write - } - - if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) { - slowCh <- obj - continue - } - - batch = append(batch, obj) - - if len(batch) < *r.Source.Snowball.Batch { - continue - } - - write: + batch := make([]ObjectInfo, 0, *r.Source.Snowball.Batch) + writeFn := func(batch []ObjectInfo) { if len(batch) > 0 { if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { logger.LogIf(ctx, err) @@ -1118,9 +1093,24 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba // persist in-memory state to disk after every 10secs. logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) } - batch = batch[:0] } } + for obj := range walkCh { + if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) { + slowCh <- obj + continue + } + + batch = append(batch, obj) + + if len(batch) < *r.Source.Snowball.Batch { + continue + } + writeFn(batch) + batch = batch[:0] + } + writeFn(batch) + xioutil.SafeClose(slowCh) }() } else { slowCh = walkCh