From 4afbb897749ed778c6fbbbf1136421af73a0707a Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Tue, 15 Feb 2022 18:25:47 +0100 Subject: [PATCH] nas: Clean stale background appended files (#14295) When more than one gateway reads and writes from the same mount point and there is a load balancer pointing to those gateways. Each gateway will try to create its own temporary append file but fails to clear it later when not needed. This commit creates a routine that checks all upload IDs saved in multipart directory and remove any stale entry with the same upload id in the memory and in the temporary background append folder as well. --- cmd/fs-v1-multipart.go | 117 +++++++++++++++++++++++++++++------------ cmd/fs-v1.go | 4 ++ 2 files changed, 86 insertions(+), 35 deletions(-) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 6bdc5bb42..8d59338a7 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -24,6 +24,7 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "sort" "strconv" "strings" @@ -35,6 +36,11 @@ import ( "github.com/minio/pkg/trie" ) +const ( + bgAppendsDirName = "bg-appends" + bgAppendsCleanupInterval = 10 * time.Minute +) + // Returns EXPORT/.minio.sys/multipart/SHA256/UPLOADID func (fs *FSObjects) getUploadIDDir(bucket, object, uploadID string) string { return pathJoin(fs.fsPath, minioMetaMultipartBucket, getSHA256Hash([]byte(pathJoin(bucket, object))), uploadID) @@ -74,7 +80,7 @@ func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploa file := fs.appendFileMap[uploadID] if file == nil { file = &fsAppendFile{ - filePath: pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())), + filePath: pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, bgAppendsDirName, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())), } fs.appendFileMap[uploadID] = file } @@ -643,7 +649,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } appendFallback := true // In case background-append did not append the required parts. - appendFilePath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())) + appendFilePath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, "bg-appends", fmt.Sprintf("%s.%s", uploadID, mustGetUUID())) // Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend() // to take care of the following corner case: @@ -843,58 +849,99 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u return nil } +// Return all uploads IDs with full path of each upload-id directory. +// Do not return an error as this is a lazy operation +func (fs *FSObjects) getAllUploadIDs(ctx context.Context) (result map[string]string) { + result = make(map[string]string) + + entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket)) + if err != nil { + return + } + for _, entry := range entries { + uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry)) + if err != nil { + continue + } + // Remove the trailing slash separator + for i := range uploadIDs { + uploadID := strings.TrimSuffix(uploadIDs[i], SlashSeparator) + result[uploadID] = pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID) + } + } + return +} + // Removes multipart uploads if any older than `expiry` duration // on all buckets for every `cleanupInterval`, this function is // blocking and should be run in a go-routine. func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) { - timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval()) - defer timer.Stop() + expiryUploadsTimer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval()) + defer expiryUploadsTimer.Stop() + + bgAppendTmpCleaner := time.NewTimer(bgAppendsCleanupInterval) + defer bgAppendTmpCleaner.Stop() for { select { case <-ctx.Done(): return - case <-timer.C: - // Reset for the next interval - timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval()) + case <-bgAppendTmpCleaner.C: + bgAppendTmpCleaner.Reset(bgAppendsCleanupInterval) - expiry := globalAPIConfig.getStaleUploadsExpiry() + foundUploadIDs := fs.getAllUploadIDs(ctx) - now := time.Now() - entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket)) + // Remove background append map from the memory + fs.appendFileMapMu.Lock() + for uploadID := range fs.appendFileMap { + _, ok := foundUploadIDs[uploadID] + if !ok { + delete(fs.appendFileMap, uploadID) + } + } + fs.appendFileMapMu.Unlock() + + // Remove background appends file from the disk + bgAppendsDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, bgAppendsDirName) + entries, err := readDir(bgAppendsDir) if err != nil { - continue + break } for _, entry := range entries { - uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry)) + uploadID := strings.Split(entry, ".")[0] + _, ok := foundUploadIDs[uploadID] + if !ok { + fsRemoveFile(ctx, pathJoin(bgAppendsDir, entry)) + } + } + + case <-expiryUploadsTimer.C: + // Reset for the next interval + expiryUploadsTimer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval()) + + expiry := globalAPIConfig.getStaleUploadsExpiry() + now := time.Now() + + uploadIDs := fs.getAllUploadIDs(ctx) + + for uploadID, path := range uploadIDs { + fi, err := fsStatDir(ctx, path) if err != nil { continue } + if now.Sub(fi.ModTime()) > expiry { + fsRemoveAll(ctx, path) + // Remove upload ID parent directory if empty + fsRemoveDir(ctx, filepath.Base(path)) - // Remove the trailing slash separator - for i := range uploadIDs { - uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator) - } - - for _, uploadID := range uploadIDs { - fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID)) - if err != nil { - continue - } - if now.Sub(fi.ModTime()) > expiry { - fsRemoveAll(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID)) - // It is safe to ignore any directory not empty error (in case there were multiple uploadIDs on the same object) - fsRemoveDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry)) - - // Remove uploadID from the append file map and its corresponding temporary file - fs.appendFileMapMu.Lock() - bgAppend, ok := fs.appendFileMap[uploadID] - if ok { - _ = fsRemoveFile(ctx, bgAppend.filePath) - delete(fs.appendFileMap, uploadID) - } - fs.appendFileMapMu.Unlock() + // Remove uploadID from the append file map and its corresponding temporary file + fs.appendFileMapMu.Lock() + bgAppend, ok := fs.appendFileMap[uploadID] + if ok { + _ = fsRemoveFile(ctx, bgAppend.filePath) + delete(fs.appendFileMap, uploadID) } + fs.appendFileMapMu.Unlock() } } } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index c26fa8e25..c5b7a634c 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -106,6 +106,10 @@ func initMetaVolumeFS(fsPath, fsUUID string) error { return err } + if err := os.MkdirAll(pathJoin(metaTmpPath, bgAppendsDirName), 0o777); err != nil { + return err + } + if err := os.MkdirAll(pathJoin(fsPath, dataUsageBucket), 0o777); err != nil { return err }