Don't close transition task channel on server exit (#16627)

This commit is contained in:
Krishnan Parthasarathi 2023-02-15 22:09:25 -08:00 committed by GitHub
parent c33a237067
commit d136ac0596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 11 deletions

View File

@ -152,7 +152,6 @@ type transitionTask struct {
} }
type transitionState struct { type transitionState struct {
once sync.Once
transitionCh chan transitionTask transitionCh chan transitionTask
ctx context.Context ctx context.Context
@ -169,10 +168,7 @@ type transitionState struct {
func (t *transitionState) queueTransitionTask(oi ObjectInfo, sc string) { func (t *transitionState) queueTransitionTask(oi ObjectInfo, sc string) {
select { select {
case <-GlobalContext.Done(): case <-t.ctx.Done():
t.once.Do(func() {
close(t.transitionCh)
})
case t.transitionCh <- transitionTask{objInfo: oi, tier: sc}: case t.transitionCh <- transitionTask{objInfo: oi, tier: sc}:
default: default:
} }
@ -214,20 +210,20 @@ func (t *transitionState) ActiveTasks() int {
} }
// worker waits for transition tasks // worker waits for transition tasks
func (t *transitionState) worker(ctx context.Context, objectAPI ObjectLayer) { func (t *transitionState) worker(objectAPI ObjectLayer) {
for { for {
select { select {
case <-t.killCh: case <-t.killCh:
return return
case <-ctx.Done(): case <-t.ctx.Done():
return return
case task, ok := <-t.transitionCh: case task, ok := <-t.transitionCh:
if !ok { if !ok {
return return
} }
atomic.AddInt32(&t.activeTasks, 1) atomic.AddInt32(&t.activeTasks, 1)
if err := transitionObject(ctx, objectAPI, task.objInfo, task.tier); err != nil { if err := transitionObject(t.ctx, objectAPI, task.objInfo, task.tier); err != nil {
logger.LogIf(ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w", logger.LogIf(t.ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w",
task.objInfo.Bucket, task.objInfo.Name, task.objInfo.VersionID, err)) task.objInfo.Bucket, task.objInfo.Name, task.objInfo.VersionID, err))
} else { } else {
ts := tierStats{ ts := tierStats{
@ -278,7 +274,7 @@ func (t *transitionState) UpdateWorkers(n int) {
func (t *transitionState) updateWorkers(n int) { func (t *transitionState) updateWorkers(n int) {
for t.numWorkers < n { for t.numWorkers < n {
go t.worker(t.ctx, t.objAPI) go t.worker(t.objAPI)
t.numWorkers++ t.numWorkers++
} }

View File

@ -483,7 +483,7 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
if opts.VersionID != "" { if opts.VersionID != "" {
fi.VersionID = opts.VersionID fi.VersionID = opts.VersionID
} }
fi.SetTierFreeVersionID(mustGetUUID())
disks := er.getDisks() disks := er.getDisks()
g := errgroup.WithNErrs(len(disks)) g := errgroup.WithNErrs(len(disks))
for index := range disks { for index := range disks {