mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
nas: Clean stale background appended files (#14295)
When more than one gateway reads and writes from the same mount point and there is a load balancer pointing to those gateways. Each gateway will try to create its own temporary append file but fails to clear it later when not needed. This commit creates a routine that checks all upload IDs saved in multipart directory and remove any stale entry with the same upload id in the memory and in the temporary background append folder as well.
This commit is contained in:
parent
5ec57a9533
commit
4afbb89774
@ -24,6 +24,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -35,6 +36,11 @@ import (
|
||||
"github.com/minio/pkg/trie"
|
||||
)
|
||||
|
||||
const (
|
||||
bgAppendsDirName = "bg-appends"
|
||||
bgAppendsCleanupInterval = 10 * time.Minute
|
||||
)
|
||||
|
||||
// Returns EXPORT/.minio.sys/multipart/SHA256/UPLOADID
|
||||
func (fs *FSObjects) getUploadIDDir(bucket, object, uploadID string) string {
|
||||
return pathJoin(fs.fsPath, minioMetaMultipartBucket, getSHA256Hash([]byte(pathJoin(bucket, object))), uploadID)
|
||||
@ -74,7 +80,7 @@ func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploa
|
||||
file := fs.appendFileMap[uploadID]
|
||||
if file == nil {
|
||||
file = &fsAppendFile{
|
||||
filePath: pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())),
|
||||
filePath: pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, bgAppendsDirName, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())),
|
||||
}
|
||||
fs.appendFileMap[uploadID] = file
|
||||
}
|
||||
@ -643,7 +649,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
}
|
||||
|
||||
appendFallback := true // In case background-append did not append the required parts.
|
||||
appendFilePath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID()))
|
||||
appendFilePath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, "bg-appends", fmt.Sprintf("%s.%s", uploadID, mustGetUUID()))
|
||||
|
||||
// Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend()
|
||||
// to take care of the following corner case:
|
||||
@ -843,48 +849,90 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
|
||||
return nil
|
||||
}
|
||||
|
||||
// Removes multipart uploads if any older than `expiry` duration
|
||||
// on all buckets for every `cleanupInterval`, this function is
|
||||
// blocking and should be run in a go-routine.
|
||||
func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) {
|
||||
timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||
defer timer.Stop()
|
||||
// Return all uploads IDs with full path of each upload-id directory.
|
||||
// Do not return an error as this is a lazy operation
|
||||
func (fs *FSObjects) getAllUploadIDs(ctx context.Context) (result map[string]string) {
|
||||
result = make(map[string]string)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
// Reset for the next interval
|
||||
timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||
|
||||
expiry := globalAPIConfig.getStaleUploadsExpiry()
|
||||
|
||||
now := time.Now()
|
||||
entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket))
|
||||
if err != nil {
|
||||
continue
|
||||
return
|
||||
}
|
||||
for _, entry := range entries {
|
||||
uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove the trailing slash separator
|
||||
for i := range uploadIDs {
|
||||
uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
|
||||
uploadID := strings.TrimSuffix(uploadIDs[i], SlashSeparator)
|
||||
result[uploadID] = pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Removes multipart uploads if any older than `expiry` duration
|
||||
// on all buckets for every `cleanupInterval`, this function is
|
||||
// blocking and should be run in a go-routine.
|
||||
func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) {
|
||||
expiryUploadsTimer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||
defer expiryUploadsTimer.Stop()
|
||||
|
||||
bgAppendTmpCleaner := time.NewTimer(bgAppendsCleanupInterval)
|
||||
defer bgAppendTmpCleaner.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-bgAppendTmpCleaner.C:
|
||||
bgAppendTmpCleaner.Reset(bgAppendsCleanupInterval)
|
||||
|
||||
foundUploadIDs := fs.getAllUploadIDs(ctx)
|
||||
|
||||
// Remove background append map from the memory
|
||||
fs.appendFileMapMu.Lock()
|
||||
for uploadID := range fs.appendFileMap {
|
||||
_, ok := foundUploadIDs[uploadID]
|
||||
if !ok {
|
||||
delete(fs.appendFileMap, uploadID)
|
||||
}
|
||||
}
|
||||
fs.appendFileMapMu.Unlock()
|
||||
|
||||
// Remove background appends file from the disk
|
||||
bgAppendsDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, bgAppendsDirName)
|
||||
entries, err := readDir(bgAppendsDir)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
for _, entry := range entries {
|
||||
uploadID := strings.Split(entry, ".")[0]
|
||||
_, ok := foundUploadIDs[uploadID]
|
||||
if !ok {
|
||||
fsRemoveFile(ctx, pathJoin(bgAppendsDir, entry))
|
||||
}
|
||||
}
|
||||
|
||||
for _, uploadID := range uploadIDs {
|
||||
fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))
|
||||
case <-expiryUploadsTimer.C:
|
||||
// Reset for the next interval
|
||||
expiryUploadsTimer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||
|
||||
expiry := globalAPIConfig.getStaleUploadsExpiry()
|
||||
now := time.Now()
|
||||
|
||||
uploadIDs := fs.getAllUploadIDs(ctx)
|
||||
|
||||
for uploadID, path := range uploadIDs {
|
||||
fi, err := fsStatDir(ctx, path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if now.Sub(fi.ModTime()) > expiry {
|
||||
fsRemoveAll(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))
|
||||
// It is safe to ignore any directory not empty error (in case there were multiple uploadIDs on the same object)
|
||||
fsRemoveDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))
|
||||
fsRemoveAll(ctx, path)
|
||||
// Remove upload ID parent directory if empty
|
||||
fsRemoveDir(ctx, filepath.Base(path))
|
||||
|
||||
// Remove uploadID from the append file map and its corresponding temporary file
|
||||
fs.appendFileMapMu.Lock()
|
||||
@ -898,5 +946,4 @@ func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -106,6 +106,10 @@ func initMetaVolumeFS(fsPath, fsUUID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(pathJoin(metaTmpPath, bgAppendsDirName), 0o777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(pathJoin(fsPath, dataUsageBucket), 0o777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user