diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 6bdc5bb42..8d59338a7 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -24,6 +24,7 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "sort" "strconv" "strings" @@ -35,6 +36,11 @@ import ( "github.com/minio/pkg/trie" ) +const ( + bgAppendsDirName = "bg-appends" + bgAppendsCleanupInterval = 10 * time.Minute +) + // Returns EXPORT/.minio.sys/multipart/SHA256/UPLOADID func (fs *FSObjects) getUploadIDDir(bucket, object, uploadID string) string { return pathJoin(fs.fsPath, minioMetaMultipartBucket, getSHA256Hash([]byte(pathJoin(bucket, object))), uploadID) @@ -74,7 +80,7 @@ func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploa file := fs.appendFileMap[uploadID] if file == nil { file = &fsAppendFile{ - filePath: pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())), + filePath: pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, bgAppendsDirName, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())), } fs.appendFileMap[uploadID] = file } @@ -643,7 +649,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } appendFallback := true // In case background-append did not append the required parts. - appendFilePath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())) + appendFilePath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, "bg-appends", fmt.Sprintf("%s.%s", uploadID, mustGetUUID())) // Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend() // to take care of the following corner case: @@ -843,58 +849,99 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u return nil } +// Return all uploads IDs with full path of each upload-id directory. +// Do not return an error as this is a lazy operation +func (fs *FSObjects) getAllUploadIDs(ctx context.Context) (result map[string]string) { + result = make(map[string]string) + + entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket)) + if err != nil { + return + } + for _, entry := range entries { + uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry)) + if err != nil { + continue + } + // Remove the trailing slash separator + for i := range uploadIDs { + uploadID := strings.TrimSuffix(uploadIDs[i], SlashSeparator) + result[uploadID] = pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID) + } + } + return +} + // Removes multipart uploads if any older than `expiry` duration // on all buckets for every `cleanupInterval`, this function is // blocking and should be run in a go-routine. func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) { - timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval()) - defer timer.Stop() + expiryUploadsTimer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval()) + defer expiryUploadsTimer.Stop() + + bgAppendTmpCleaner := time.NewTimer(bgAppendsCleanupInterval) + defer bgAppendTmpCleaner.Stop() for { select { case <-ctx.Done(): return - case <-timer.C: - // Reset for the next interval - timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval()) + case <-bgAppendTmpCleaner.C: + bgAppendTmpCleaner.Reset(bgAppendsCleanupInterval) - expiry := globalAPIConfig.getStaleUploadsExpiry() + foundUploadIDs := fs.getAllUploadIDs(ctx) - now := time.Now() - entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket)) + // Remove background append map from the memory + fs.appendFileMapMu.Lock() + for uploadID := range fs.appendFileMap { + _, ok := foundUploadIDs[uploadID] + if !ok { + delete(fs.appendFileMap, uploadID) + } + } + fs.appendFileMapMu.Unlock() + + // Remove background appends file from the disk + bgAppendsDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, bgAppendsDirName) + entries, err := readDir(bgAppendsDir) if err != nil { - continue + break } for _, entry := range entries { - uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry)) + uploadID := strings.Split(entry, ".")[0] + _, ok := foundUploadIDs[uploadID] + if !ok { + fsRemoveFile(ctx, pathJoin(bgAppendsDir, entry)) + } + } + + case <-expiryUploadsTimer.C: + // Reset for the next interval + expiryUploadsTimer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval()) + + expiry := globalAPIConfig.getStaleUploadsExpiry() + now := time.Now() + + uploadIDs := fs.getAllUploadIDs(ctx) + + for uploadID, path := range uploadIDs { + fi, err := fsStatDir(ctx, path) if err != nil { continue } + if now.Sub(fi.ModTime()) > expiry { + fsRemoveAll(ctx, path) + // Remove upload ID parent directory if empty + fsRemoveDir(ctx, filepath.Base(path)) - // Remove the trailing slash separator - for i := range uploadIDs { - uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator) - } - - for _, uploadID := range uploadIDs { - fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID)) - if err != nil { - continue - } - if now.Sub(fi.ModTime()) > expiry { - fsRemoveAll(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID)) - // It is safe to ignore any directory not empty error (in case there were multiple uploadIDs on the same object) - fsRemoveDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry)) - - // Remove uploadID from the append file map and its corresponding temporary file - fs.appendFileMapMu.Lock() - bgAppend, ok := fs.appendFileMap[uploadID] - if ok { - _ = fsRemoveFile(ctx, bgAppend.filePath) - delete(fs.appendFileMap, uploadID) - } - fs.appendFileMapMu.Unlock() + // Remove uploadID from the append file map and its corresponding temporary file + fs.appendFileMapMu.Lock() + bgAppend, ok := fs.appendFileMap[uploadID] + if ok { + _ = fsRemoveFile(ctx, bgAppend.filePath) + delete(fs.appendFileMap, uploadID) } + fs.appendFileMapMu.Unlock() } } } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index c26fa8e25..c5b7a634c 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -106,6 +106,10 @@ func initMetaVolumeFS(fsPath, fsUUID string) error { return err } + if err := os.MkdirAll(pathJoin(metaTmpPath, bgAppendsDirName), 0o777); err != nil { + return err + } + if err := os.MkdirAll(pathJoin(fsPath, dataUsageBucket), 0o777); err != nil { return err }