From 91faaa13877df0e2989b1eb3821f0e82fcbbfe80 Mon Sep 17 00:00:00 2001 From: Poorna Date: Fri, 28 Jun 2024 18:20:47 -0700 Subject: [PATCH] fix panic in batch replicate (#20014) Fixes: ``` panic: send on closed channel panic: close of closed channel goroutine 878 [running]: github.com/minio/minio/internal/ioutil.SafeClose[...](...) /Users/kp/code/src/github.com/minio/minio/internal/ioutil/ioutil.go:407 github.com/minio/minio/cmd.(*erasureServerPools).Walk.func2.2() /Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:2229 +0xc0 panic({0x108c25e60?, 0x1090b28d0?}) /usr/local/go/src/runtime/panic.go:770 +0x124 github.com/minio/minio/cmd.(*erasureServerPools).Walk.func2.3({{0x1400e397316, 0x5}, {0x1400d88b8a8, 0x8}, {0x1f99d80, 0xede101c42, 0x0}, 0x3bc, 0x0, 0x0, ...}) /Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:2235 +0xb4 github.com/minio/minio/cmd.(*erasureServerPools).Walk.func2() /Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:2277 +0xabc created by github.com/minio/minio/cmd.(*erasureServerPools).Walk in goroutine 575 /Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:2210 +0x33c ``` --- cmd/batch-handlers.go | 148 +++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 75 deletions(-) diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 0e87cc795..f96c5b7be 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -1071,86 +1071,84 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) - var ( - walkCh = make(chan itemOrErr[ObjectInfo], 100) - slowCh = make(chan itemOrErr[ObjectInfo], 100) - ) - - if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { - go func() { - // Snowball currently needs the high level minio-go Client, not the Core one - cl, err := miniogo.New(u.Host, &miniogo.Options{ - Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), - Secure: u.Scheme == "https", - Transport: getRemoteInstanceTransport(), - BucketLookup: lookupStyle(r.Target.Path), - }) - if err != nil { - batchLogIf(ctx, err) - return - } - - // Already validated before arriving here - smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan) - - batch := make([]ObjectInfo, 0, *r.Source.Snowball.Batch) - writeFn := func(batch []ObjectInfo) { - if len(batch) > 0 { - if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { - batchLogIf(ctx, err) - for _, b := range batch { - slowCh <- itemOrErr[ObjectInfo]{Item: b} - } - } else { - ri.trackCurrentBucketBatch(r.Source.Bucket, batch) - globalBatchJobsMetrics.save(job.ID, ri) - // persist in-memory state to disk after every 10secs. - batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) - } - } - } - for obj := range walkCh { - if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) { - slowCh <- obj - continue - } - - batch = append(batch, obj.Item) - - if len(batch) < *r.Source.Snowball.Batch { - continue - } - writeFn(batch) - batch = batch[:0] - } - writeFn(batch) - xioutil.SafeClose(slowCh) - }() - } else { - slowCh = walkCh - } - - workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2))) - if err != nil { - return err - } - - wk, err := workers.New(workerSize) - if err != nil { - // invalid worker size. - return err - } - - walkQuorum := env.Get("_MINIO_BATCH_REPLICATION_WALK_QUORUM", "strict") - if walkQuorum == "" { - walkQuorum = "strict" - } - retryAttempts := ri.RetryAttempts retry := false for attempts := 1; attempts <= retryAttempts; attempts++ { attempts := attempts + var ( + walkCh = make(chan itemOrErr[ObjectInfo], 100) + slowCh = make(chan itemOrErr[ObjectInfo], 100) + ) + if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { + go func() { + // Snowball currently needs the high level minio-go Client, not the Core one + cl, err := miniogo.New(u.Host, &miniogo.Options{ + Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), + Secure: u.Scheme == "https", + Transport: getRemoteInstanceTransport(), + BucketLookup: lookupStyle(r.Target.Path), + }) + if err != nil { + batchLogIf(ctx, err) + return + } + + // Already validated before arriving here + smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan) + + batch := make([]ObjectInfo, 0, *r.Source.Snowball.Batch) + writeFn := func(batch []ObjectInfo) { + if len(batch) > 0 { + if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { + batchLogIf(ctx, err) + for _, b := range batch { + slowCh <- itemOrErr[ObjectInfo]{Item: b} + } + } else { + ri.trackCurrentBucketBatch(r.Source.Bucket, batch) + globalBatchJobsMetrics.save(job.ID, ri) + // persist in-memory state to disk after every 10secs. + batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) + } + } + } + for obj := range walkCh { + if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) { + slowCh <- obj + continue + } + + batch = append(batch, obj.Item) + + if len(batch) < *r.Source.Snowball.Batch { + continue + } + writeFn(batch) + batch = batch[:0] + } + writeFn(batch) + xioutil.SafeClose(slowCh) + }() + } else { + slowCh = walkCh + } + + workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2))) + if err != nil { + return err + } + + wk, err := workers.New(workerSize) + if err != nil { + // invalid worker size. + return err + } + + walkQuorum := env.Get("_MINIO_BATCH_REPLICATION_WALK_QUORUM", "strict") + if walkQuorum == "" { + walkQuorum = "strict" + } ctx, cancel := context.WithCancel(ctx) // one of source/target is s3, skip delete marker and all versions under the same object name. s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3