add missing cleanupStaleMultipartUploads (#10325)

fixes #10319
This commit is contained in:
Harshavardhana 2020-08-21 21:39:54 -07:00 committed by GitHub
parent 23774353b7
commit 95411228db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import (
"sort"
"strconv"
"strings"
"time"
"github.com/minio/minio-go/v7/pkg/set"
xhttp "github.com/minio/minio/cmd/http"
@ -69,6 +70,56 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri
g.Wait()
}
// Clean-up the old multipart uploads. Should be run in a Go routine.
func (er erasureObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
var disk StorageAPI
for _, d := range er.getLoadBalancedDisks() {
if d != nil {
disk = d
break
}
}
if disk == nil {
continue
}
er.cleanupStaleMultipartUploadsOnDisk(ctx, disk, expiry)
}
}
}
// Remove the old multipart uploads on the given disk.
func (er erasureObjects) cleanupStaleMultipartUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) {
now := time.Now()
shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "", -1)
if err != nil {
return
}
for _, shaDir := range shaDirs {
uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir, -1)
if err != nil {
continue
}
for _, uploadIDDir := range uploadIDDirs {
uploadIDPath := pathJoin(shaDir, uploadIDDir)
fi, err := disk.ReadVersion(minioMetaMultipartBucket, uploadIDPath, "")
if err != nil {
continue
}
if now.Sub(fi.ModTime) > expiry {
er.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, fi.Erasure.DataBlocks+1)
}
}
}
}
// ListMultipartUploads - lists all the pending multipart
// uploads for a particular object in a bucket.
//

View File

@ -361,6 +361,9 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
bp: bp,
mrfOpCh: make(chan partialOperation, 10000),
}
go s.sets[i].cleanupStaleMultipartUploads(ctx,
GlobalMultipartCleanupInterval, GlobalMultipartExpiry)
}
// Start the disk monitoring and connect routine.