fix panic in batch replicate (#20014)

Fixes:

```
panic: send on closed channel
	panic: close of closed channel

goroutine 878 [running]:
github.com/minio/minio/internal/ioutil.SafeClose[...](...)
	/Users/kp/code/src/github.com/minio/minio/internal/ioutil/ioutil.go:407
github.com/minio/minio/cmd.(*erasureServerPools).Walk.func2.2()
	/Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:2229 +0xc0
panic({0x108c25e60?, 0x1090b28d0?})
	/usr/local/go/src/runtime/panic.go:770 +0x124
github.com/minio/minio/cmd.(*erasureServerPools).Walk.func2.3({{0x1400e397316, 0x5}, {0x1400d88b8a8, 0x8}, {0x1f99d80, 0xede101c42, 0x0}, 0x3bc, 0x0, 0x0, ...})
	/Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:2235 +0xb4
github.com/minio/minio/cmd.(*erasureServerPools).Walk.func2()
	/Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:2277 +0xabc
created by github.com/minio/minio/cmd.(*erasureServerPools).Walk in goroutine 575
	/Users/kp/code/src/github.com/minio/minio/cmd/erasure-server-pool.go:2210 +0x33c
```
This commit is contained in:
Poorna 2024-06-28 18:20:47 -07:00 committed by GitHub
parent 68a9f521d5
commit 91faaa1387
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 73 additions and 75 deletions

View File

@ -1071,86 +1071,84 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)
var (
walkCh = make(chan itemOrErr[ObjectInfo], 100)
slowCh = make(chan itemOrErr[ObjectInfo], 100)
)
if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() {
go func() {
// Snowball currently needs the high level minio-go Client, not the Core one
cl, err := miniogo.New(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport(),
BucketLookup: lookupStyle(r.Target.Path),
})
if err != nil {
batchLogIf(ctx, err)
return
}
// Already validated before arriving here
smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan)
batch := make([]ObjectInfo, 0, *r.Source.Snowball.Batch)
writeFn := func(batch []ObjectInfo) {
if len(batch) > 0 {
if err := r.writeAsArchive(ctx, api, cl, batch); err != nil {
batchLogIf(ctx, err)
for _, b := range batch {
slowCh <- itemOrErr[ObjectInfo]{Item: b}
}
} else {
ri.trackCurrentBucketBatch(r.Source.Bucket, batch)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
}
}
}
for obj := range walkCh {
if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) {
slowCh <- obj
continue
}
batch = append(batch, obj.Item)
if len(batch) < *r.Source.Snowball.Batch {
continue
}
writeFn(batch)
batch = batch[:0]
}
writeFn(batch)
xioutil.SafeClose(slowCh)
}()
} else {
slowCh = walkCh
}
workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2)))
if err != nil {
return err
}
wk, err := workers.New(workerSize)
if err != nil {
// invalid worker size.
return err
}
walkQuorum := env.Get("_MINIO_BATCH_REPLICATION_WALK_QUORUM", "strict")
if walkQuorum == "" {
walkQuorum = "strict"
}
retryAttempts := ri.RetryAttempts
retry := false
for attempts := 1; attempts <= retryAttempts; attempts++ {
attempts := attempts
var (
walkCh = make(chan itemOrErr[ObjectInfo], 100)
slowCh = make(chan itemOrErr[ObjectInfo], 100)
)
if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() {
go func() {
// Snowball currently needs the high level minio-go Client, not the Core one
cl, err := miniogo.New(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport(),
BucketLookup: lookupStyle(r.Target.Path),
})
if err != nil {
batchLogIf(ctx, err)
return
}
// Already validated before arriving here
smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan)
batch := make([]ObjectInfo, 0, *r.Source.Snowball.Batch)
writeFn := func(batch []ObjectInfo) {
if len(batch) > 0 {
if err := r.writeAsArchive(ctx, api, cl, batch); err != nil {
batchLogIf(ctx, err)
for _, b := range batch {
slowCh <- itemOrErr[ObjectInfo]{Item: b}
}
} else {
ri.trackCurrentBucketBatch(r.Source.Bucket, batch)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
}
}
}
for obj := range walkCh {
if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) {
slowCh <- obj
continue
}
batch = append(batch, obj.Item)
if len(batch) < *r.Source.Snowball.Batch {
continue
}
writeFn(batch)
batch = batch[:0]
}
writeFn(batch)
xioutil.SafeClose(slowCh)
}()
} else {
slowCh = walkCh
}
workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2)))
if err != nil {
return err
}
wk, err := workers.New(workerSize)
if err != nil {
// invalid worker size.
return err
}
walkQuorum := env.Get("_MINIO_BATCH_REPLICATION_WALK_QUORUM", "strict")
if walkQuorum == "" {
walkQuorum = "strict"
}
ctx, cancel := context.WithCancel(ctx)
// one of source/target is s3, skip delete marker and all versions under the same object name.
s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3