// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"sort"
	"strconv"
	"strings"
	"time"

	jsoniter "github.com/json-iterator/go"
	xioutil "github.com/minio/minio/internal/ioutil"
	"github.com/minio/minio/internal/logger"
	"github.com/minio/pkg/trie"
)

const (
	bgAppendsDirName         = "bg-appends"
	bgAppendsCleanupInterval = 10 * time.Minute
)

// Returns EXPORT/.minio.sys/multipart/SHA256/UPLOADID
func (fs *FSObjects) getUploadIDDir(bucket, object, uploadID string) string {
	return pathJoin(fs.fsPath, minioMetaMultipartBucket, getSHA256Hash([]byte(pathJoin(bucket, object))), uploadID)
}

// Returns EXPORT/.minio.sys/multipart/SHA256
func (fs *FSObjects) getMultipartSHADir(bucket, object string) string {
	return pathJoin(fs.fsPath, minioMetaMultipartBucket, getSHA256Hash([]byte(pathJoin(bucket, object))))
}

// Returns partNumber.etag
func (fs *FSObjects) encodePartFile(partNumber int, etag string, actualSize int64) string {
	return fmt.Sprintf("%.5d.%s.%d", partNumber, etag, actualSize)
}

// Returns partNumber and etag
func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, actualSize int64, err error) {
	result := strings.Split(name, ".")
	if len(result) != 3 {
		return 0, "", 0, errUnexpected
	}
	partNumber, err = strconv.Atoi(result[0])
	if err != nil {
		return 0, "", 0, errUnexpected
	}
	actualSize, err = strconv.ParseInt(result[2], 10, 64)
	if err != nil {
		return 0, "", 0, errUnexpected
	}
	return partNumber, result[1], actualSize, nil
}

// Appends parts to an appendFile sequentially.
func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploadID string) {
	fs.appendFileMapMu.Lock()
	logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID)
	file := fs.appendFileMap[uploadID]
	if file == nil {
		file = &fsAppendFile{
			filePath: pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, bgAppendsDirName, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())),
		}
		fs.appendFileMap[uploadID] = file
	}
	fs.appendFileMapMu.Unlock()

	file.Lock()
	defer file.Unlock()

	// Since we append sequentially nextPartNumber will always be len(file.parts)+1
	nextPartNumber := len(file.parts) + 1
	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)

	entries, err := readDir(uploadIDDir)
	if err != nil {
		logger.GetReqInfo(ctx).AppendTags("uploadIDDir", uploadIDDir)
		logger.LogIf(ctx, err)
		return
	}
	sort.Strings(entries)

	for _, entry := range entries {
		if entry == fs.metaJSONFile {
			continue
		}
		partNumber, etag, actualSize, err := fs.decodePartFile(entry)
		if err != nil {
			// Skip part files whose name don't match expected format. These could be backend filesystem specific files.
			continue
		}
		if partNumber < nextPartNumber {
			// Part already appended.
			continue
		}
		if partNumber > nextPartNumber {
			// Required part number is not yet uploaded.
			return
		}

		partPath := pathJoin(uploadIDDir, entry)
		err = xioutil.AppendFile(file.filePath, partPath, globalFSOSync)
		if err != nil {
			reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath)
			reqInfo.AppendTags("filepath", file.filePath)
			logger.LogIf(ctx, err)
			return
		}

		file.parts = append(file.parts, PartInfo{PartNumber: partNumber, ETag: etag, ActualSize: actualSize})
		nextPartNumber++
	}
}

// ListMultipartUploads - lists all the uploadIDs for the specified object.
// We do not support prefix based listing.
func (fs *FSObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, e error) {
	if err := checkListMultipartArgs(ctx, bucket, object, keyMarker, uploadIDMarker, delimiter, fs); err != nil {
		return result, toObjectErr(err)
	}

	if _, err := fs.statBucketDir(ctx, bucket); err != nil {
		return result, toObjectErr(err, bucket)
	}

	result.MaxUploads = maxUploads
	result.KeyMarker = keyMarker
	result.Prefix = object
	result.Delimiter = delimiter
	result.NextKeyMarker = object
	result.UploadIDMarker = uploadIDMarker

	uploadIDs, err := readDir(fs.getMultipartSHADir(bucket, object))
	if err != nil {
		if err == errFileNotFound {
			result.IsTruncated = false
			return result, nil
		}
		logger.LogIf(ctx, err)
		return result, toObjectErr(err)
	}

	// S3 spec says uploadIDs should be sorted based on initiated time. ModTime of fs.json
	// is the creation time of the uploadID, hence we will use that.
	var uploads []MultipartInfo
	for _, uploadID := range uploadIDs {
		metaFilePath := pathJoin(fs.getMultipartSHADir(bucket, object), uploadID, fs.metaJSONFile)
		fi, err := fsStatFile(ctx, metaFilePath)
		if err != nil {
			return result, toObjectErr(err, bucket, object)
		}
		uploads = append(uploads, MultipartInfo{
			Object:    object,
			UploadID:  strings.TrimSuffix(uploadID, SlashSeparator),
			Initiated: fi.ModTime(),
		})
	}
	sort.Slice(uploads, func(i int, j int) bool {
		return uploads[i].Initiated.Before(uploads[j].Initiated)
	})

	uploadIndex := 0
	if uploadIDMarker != "" {
		for uploadIndex < len(uploads) {
			if uploads[uploadIndex].UploadID != uploadIDMarker {
				uploadIndex++
				continue
			}
			if uploads[uploadIndex].UploadID == uploadIDMarker {
				uploadIndex++
				break
			}
			uploadIndex++
		}
	}
	for uploadIndex < len(uploads) {
		result.Uploads = append(result.Uploads, uploads[uploadIndex])
		result.NextUploadIDMarker = uploads[uploadIndex].UploadID
		uploadIndex++
		if len(result.Uploads) == maxUploads {
			break
		}
	}

	result.IsTruncated = uploadIndex < len(uploads)

	if !result.IsTruncated {
		result.NextKeyMarker = ""
		result.NextUploadIDMarker = ""
	}

	return result, nil
}

// NewMultipartUpload - initialize a new multipart upload, returns a
// unique id. The unique id returned here is of UUID form, for each
// subsequent request each UUID is unique.
//
// Implements S3 compatible initiate multipart API.
func (fs *FSObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) {
	if err := checkNewMultipartArgs(ctx, bucket, object, fs); err != nil {
		return "", toObjectErr(err, bucket)
	}

	if _, err := fs.statBucketDir(ctx, bucket); err != nil {
		return "", toObjectErr(err, bucket)
	}

	uploadID := mustGetUUID()
	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)

	err := mkdirAll(uploadIDDir, 0o755)
	if err != nil {
		logger.LogIf(ctx, err)
		return "", err
	}

	// Initialize fs.json values.
	fsMeta := newFSMetaV1()
	fsMeta.Meta = opts.UserDefined

	fsMetaBytes, err := json.Marshal(fsMeta)
	if err != nil {
		logger.LogIf(ctx, err)
		return "", err
	}

	if err = ioutil.WriteFile(pathJoin(uploadIDDir, fs.metaJSONFile), fsMetaBytes, 0o666); err != nil {
		logger.LogIf(ctx, err)
		return "", err
	}

	return uploadID, nil
}

// CopyObjectPart - similar to PutObjectPart but reads data from an existing
// object. Internally incoming data is written to '.minio.sys/tmp' location
// and safely renamed to '.minio.sys/multipart' for reach parts.
func (fs *FSObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int,
	startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error,
) {
	if srcOpts.VersionID != "" && srcOpts.VersionID != nullVersionID {
		return pi, VersionNotFound{
			Bucket:    srcBucket,
			Object:    srcObject,
			VersionID: srcOpts.VersionID,
		}
	}

	if err := checkNewMultipartArgs(ctx, srcBucket, srcObject, fs); err != nil {
		return pi, toObjectErr(err)
	}

	partInfo, err := fs.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, srcInfo.PutObjReader, dstOpts)
	if err != nil {
		return pi, toObjectErr(err, dstBucket, dstObject)
	}

	return partInfo, nil
}

// PutObjectPart - reads incoming data until EOF for the part file on
// an ongoing multipart transaction. Internally incoming data is
// written to '.minio.sys/tmp' location and safely renamed to
// '.minio.sys/multipart' for reach parts.
func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, e error) {
	if opts.VersionID != "" && opts.VersionID != nullVersionID {
		return pi, VersionNotFound{
			Bucket:    bucket,
			Object:    object,
			VersionID: opts.VersionID,
		}
	}

	data := r.Reader
	if err := checkPutObjectPartArgs(ctx, bucket, object, fs); err != nil {
		return pi, toObjectErr(err, bucket)
	}

	if _, err := fs.statBucketDir(ctx, bucket); err != nil {
		return pi, toObjectErr(err, bucket)
	}

	// Validate input data size and it can never be less than -1.
	if data.Size() < -1 {
		logger.LogIf(ctx, errInvalidArgument, logger.Application)
		return pi, toObjectErr(errInvalidArgument)
	}

	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)

	// Just check if the uploadID exists to avoid copy if it doesn't.
	_, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
	if err != nil {
		if err == errFileNotFound || err == errFileAccessDenied {
			return pi, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
		}
		return pi, toObjectErr(err, bucket, object)
	}

	tmpPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID+"."+mustGetUUID()+"."+strconv.Itoa(partID))
	bytesWritten, err := fsCreateFile(ctx, tmpPartPath, data, data.Size())

	// Delete temporary part in case of failure. If
	// PutObjectPart succeeds then there would be nothing to
	// delete in which case we just ignore the error.
	defer fsRemoveFile(ctx, tmpPartPath)

	if err != nil {
		return pi, toObjectErr(err, minioMetaTmpBucket, tmpPartPath)
	}

	// Should return IncompleteBody{} error when reader has fewer
	// bytes than specified in request header.
	if bytesWritten < data.Size() {
		return pi, IncompleteBody{Bucket: bucket, Object: object}
	}

	etag := r.MD5CurrentHexString()

	if etag == "" {
		etag = GenETag()
	}

	partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag, data.ActualSize()))

	// Make sure not to create parent directories if they don't exist - the upload might have been aborted.
	if err = Rename(tmpPartPath, partPath); err != nil {
		if err == errFileNotFound || err == errFileAccessDenied {
			return pi, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
		}
		return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
	}

	go fs.backgroundAppend(ctx, bucket, object, uploadID)

	fi, err := fsStatFile(ctx, partPath)
	if err != nil {
		return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
	}
	return PartInfo{
		PartNumber:   partID,
		LastModified: fi.ModTime(),
		ETag:         etag,
		Size:         fi.Size(),
		ActualSize:   data.ActualSize(),
	}, nil
}

// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used
// by callers to verify object states
// - encrypted
// - compressed
func (fs *FSObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
	minfo := MultipartInfo{
		Bucket:   bucket,
		Object:   object,
		UploadID: uploadID,
	}

	if err := checkListPartsArgs(ctx, bucket, object, fs); err != nil {
		return minfo, toObjectErr(err)
	}

	// Check if bucket exists
	if _, err := fs.statBucketDir(ctx, bucket); err != nil {
		return minfo, toObjectErr(err, bucket)
	}

	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
	if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil {
		if err == errFileNotFound || err == errFileAccessDenied {
			return minfo, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
		}
		return minfo, toObjectErr(err, bucket, object)
	}

	fsMetaBytes, err := xioutil.ReadFile(pathJoin(uploadIDDir, fs.metaJSONFile))
	if err != nil {
		logger.LogIf(ctx, err)
		return minfo, toObjectErr(err, bucket, object)
	}

	var fsMeta fsMetaV1
	json := jsoniter.ConfigCompatibleWithStandardLibrary
	if err = json.Unmarshal(fsMetaBytes, &fsMeta); err != nil {
		return minfo, toObjectErr(err, bucket, object)
	}

	minfo.UserDefined = fsMeta.Meta
	return minfo, nil
}

// ListObjectParts - lists all previously uploaded parts for a given
// object and uploadID.  Takes additional input of part-number-marker
// to indicate where the listing should begin from.
//
// Implements S3 compatible ListObjectParts API. The resulting
// ListPartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, e error) {
	if err := checkListPartsArgs(ctx, bucket, object, fs); err != nil {
		return result, toObjectErr(err)
	}
	result.Bucket = bucket
	result.Object = object
	result.UploadID = uploadID
	result.MaxParts = maxParts
	result.PartNumberMarker = partNumberMarker

	// Check if bucket exists
	if _, err := fs.statBucketDir(ctx, bucket); err != nil {
		return result, toObjectErr(err, bucket)
	}

	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
	if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil {
		if err == errFileNotFound || err == errFileAccessDenied {
			return result, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
		}
		return result, toObjectErr(err, bucket, object)
	}

	entries, err := readDir(uploadIDDir)
	if err != nil {
		logger.LogIf(ctx, err)
		return result, toObjectErr(err, bucket)
	}

	partsMap := make(map[int]PartInfo)
	for _, entry := range entries {
		if entry == fs.metaJSONFile {
			continue
		}

		partNumber, currentEtag, actualSize, derr := fs.decodePartFile(entry)
		if derr != nil {
			// Skip part files whose name don't match expected format. These could be backend filesystem specific files.
			continue
		}

		entryStat, err := fsStatFile(ctx, pathJoin(uploadIDDir, entry))
		if err != nil {
			continue
		}

		currentMeta := PartInfo{
			PartNumber:   partNumber,
			ETag:         currentEtag,
			ActualSize:   actualSize,
			Size:         entryStat.Size(),
			LastModified: entryStat.ModTime(),
		}

		cachedMeta, ok := partsMap[partNumber]
		if !ok {
			partsMap[partNumber] = currentMeta
			continue
		}

		if currentMeta.LastModified.After(cachedMeta.LastModified) {
			partsMap[partNumber] = currentMeta
		}
	}

	var parts []PartInfo
	for _, partInfo := range partsMap {
		parts = append(parts, partInfo)
	}

	sort.Slice(parts, func(i int, j int) bool {
		return parts[i].PartNumber < parts[j].PartNumber
	})

	i := 0
	if partNumberMarker != 0 {
		// If the marker was set, skip the entries till the marker.
		for _, part := range parts {
			i++
			if part.PartNumber == partNumberMarker {
				break
			}
		}
	}

	partsCount := 0
	for partsCount < maxParts && i < len(parts) {
		result.Parts = append(result.Parts, parts[i])
		i++
		partsCount++
	}
	if i < len(parts) {
		result.IsTruncated = true
		if partsCount != 0 {
			result.NextPartNumberMarker = result.Parts[partsCount-1].PartNumber
		}
	}

	rc, _, err := fsOpenFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile), 0)
	if err != nil {
		if err == errFileNotFound || err == errFileAccessDenied {
			return result, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
		}
		return result, toObjectErr(err, bucket, object)
	}
	defer rc.Close()

	fsMetaBytes, err := ioutil.ReadAll(rc)
	if err != nil {
		return result, toObjectErr(err, bucket, object)
	}

	var fsMeta fsMetaV1
	json := jsoniter.ConfigCompatibleWithStandardLibrary
	if err = json.Unmarshal(fsMetaBytes, &fsMeta); err != nil {
		return result, err
	}

	result.UserDefined = fsMeta.Meta
	return result, nil
}

// CompleteMultipartUpload - completes an ongoing multipart
// transaction after receiving all the parts indicated by the client.
// Returns an md5sum calculated by concatenating all the individual
// md5sums of all the parts.
//
// Implements S3 compatible Complete multipart API.
func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, e error) {
	var actualSize int64

	if err := checkCompleteMultipartArgs(ctx, bucket, object, fs); err != nil {
		return oi, toObjectErr(err)
	}

	if _, err := fs.statBucketDir(ctx, bucket); err != nil {
		return oi, toObjectErr(err, bucket)
	}
	defer NSUpdated(bucket, object)

	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
	// Just check if the uploadID exists to avoid copy if it doesn't.
	_, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
	if err != nil {
		if err == errFileNotFound || err == errFileAccessDenied {
			return oi, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
		}
		return oi, toObjectErr(err, bucket, object)
	}

	// ensure that part ETag is canonicalized to strip off extraneous quotes
	for i := range parts {
		parts[i].ETag = canonicalizeETag(parts[i].ETag)
	}

	fsMeta := fsMetaV1{}

	// Allocate parts similar to incoming slice.
	fsMeta.Parts = make([]ObjectPartInfo, len(parts))

	entries, err := readDir(uploadIDDir)
	if err != nil {
		logger.GetReqInfo(ctx).AppendTags("uploadIDDir", uploadIDDir)
		logger.LogIf(ctx, err)
		return oi, err
	}

	// Create entries trie structure for prefix match
	entriesTrie := trie.NewTrie()
	for _, entry := range entries {
		entriesTrie.Insert(entry)
	}

	// Save consolidated actual size.
	var objectActualSize int64
	// Validate all parts and then commit to disk.
	for i, part := range parts {
		partFile := getPartFile(entriesTrie, part.PartNumber, part.ETag)
		if partFile == "" {
			return oi, InvalidPart{
				PartNumber: part.PartNumber,
				GotETag:    part.ETag,
			}
		}

		// Read the actualSize from the pathFileName.
		subParts := strings.Split(partFile, ".")
		actualSize, err = strconv.ParseInt(subParts[len(subParts)-1], 10, 64)
		if err != nil {
			return oi, InvalidPart{
				PartNumber: part.PartNumber,
				GotETag:    part.ETag,
			}
		}

		partPath := pathJoin(uploadIDDir, partFile)

		var fi os.FileInfo
		fi, err = fsStatFile(ctx, partPath)
		if err != nil {
			if err == errFileNotFound || err == errFileAccessDenied {
				return oi, InvalidPart{}
			}
			return oi, err
		}

		fsMeta.Parts[i] = ObjectPartInfo{
			Number:     part.PartNumber,
			Size:       fi.Size(),
			ActualSize: actualSize,
		}

		// Consolidate the actual size.
		objectActualSize += actualSize

		if i == len(parts)-1 {
			break
		}

		// All parts except the last part has to be atleast 5MB.
		if !isMinAllowedPartSize(actualSize) {
			return oi, PartTooSmall{
				PartNumber: part.PartNumber,
				PartSize:   actualSize,
				PartETag:   part.ETag,
			}
		}
	}

	appendFallback := true // In case background-append did not append the required parts.
	appendFilePath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, "bg-appends", fmt.Sprintf("%s.%s", uploadID, mustGetUUID()))

	// Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend()
	// to take care of the following corner case:
	// 1. The last PutObjectPart triggers go-routine fs.backgroundAppend, this go-routine has not started yet.
	// 2. Now CompleteMultipartUpload gets called which sees that lastPart is not appended and starts appending
	//    from the beginning
	fs.backgroundAppend(ctx, bucket, object, uploadID)

	fs.appendFileMapMu.Lock()
	file := fs.appendFileMap[uploadID]
	delete(fs.appendFileMap, uploadID)
	fs.appendFileMapMu.Unlock()

	if file != nil {
		file.Lock()
		defer file.Unlock()
		// Verify that appendFile has all the parts.
		if len(file.parts) == len(parts) {
			for i := range parts {
				if parts[i].ETag != file.parts[i].ETag {
					break
				}
				if parts[i].PartNumber != file.parts[i].PartNumber {
					break
				}
				if i == len(parts)-1 {
					appendFilePath = file.filePath
					appendFallback = false
				}
			}
		}
	}

	if appendFallback {
		if file != nil {
			fsRemoveFile(ctx, file.filePath)
		}
		for _, part := range parts {
			partFile := getPartFile(entriesTrie, part.PartNumber, part.ETag)
			if partFile == "" {
				logger.LogIf(ctx, fmt.Errorf("%.5d.%s missing will not proceed",
					part.PartNumber, part.ETag))
				return oi, InvalidPart{
					PartNumber: part.PartNumber,
					GotETag:    part.ETag,
				}
			}
			if err = xioutil.AppendFile(appendFilePath, pathJoin(uploadIDDir, partFile), globalFSOSync); err != nil {
				logger.LogIf(ctx, err)
				return oi, toObjectErr(err)
			}
		}
	}

	// Hold write lock on the object.
	destLock := fs.NewNSLock(bucket, object)
	lkctx, err := destLock.GetLock(ctx, globalOperationTimeout)
	if err != nil {
		return oi, err
	}
	ctx = lkctx.Context()
	defer destLock.Unlock(lkctx.Cancel)

	bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
	fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
	metaFile, err := fs.rwPool.Write(fsMetaPath)
	var freshFile bool
	if err != nil {
		if !errors.Is(err, errFileNotFound) {
			logger.LogIf(ctx, err)
			return oi, toObjectErr(err, bucket, object)
		}
		metaFile, err = fs.rwPool.Create(fsMetaPath)
		if err != nil {
			logger.LogIf(ctx, err)
			return oi, toObjectErr(err, bucket, object)
		}
		freshFile = true
	}
	defer metaFile.Close()
	defer func() {
		// Remove meta file when CompleteMultipart encounters
		// any error and it is a fresh file.
		//
		// We should preserve the `fs.json` of any
		// existing object
		if e != nil && freshFile {
			tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID)
			fsRemoveMeta(ctx, bucketMetaDir, fsMetaPath, tmpDir)
		}
	}()

	// Read saved fs metadata for ongoing multipart.
	fsMetaBuf, err := xioutil.ReadFile(pathJoin(uploadIDDir, fs.metaJSONFile))
	if err != nil {
		logger.LogIf(ctx, err)
		return oi, toObjectErr(err, bucket, object)
	}
	err = json.Unmarshal(fsMetaBuf, &fsMeta)
	if err != nil {
		logger.LogIf(ctx, err)
		return oi, toObjectErr(err, bucket, object)
	}
	// Save additional metadata.
	if fsMeta.Meta == nil {
		fsMeta.Meta = make(map[string]string)
	}

	fsMeta.Meta["etag"] = opts.UserDefined["etag"]
	if fsMeta.Meta["etag"] == "" {
		fsMeta.Meta["etag"] = getCompleteMultipartMD5(parts)
	}

	// Save consolidated actual size.
	fsMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
	if _, err = fsMeta.WriteTo(metaFile); err != nil {
		logger.LogIf(ctx, err)
		return oi, toObjectErr(err, bucket, object)
	}

	err = fsRenameFile(ctx, appendFilePath, pathJoin(fs.fsPath, bucket, object))
	if err != nil {
		logger.LogIf(ctx, err)
		return oi, toObjectErr(err, bucket, object)
	}

	// Purge multipart folders
	{
		fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, mustGetUUID())
		defer fsRemoveAll(ctx, fsTmpObjPath) // remove multipart temporary files in background.

		Rename(uploadIDDir, fsTmpObjPath)

		// It is safe to ignore any directory not empty error (in case there were multiple uploadIDs on the same object)
		fsRemoveDir(ctx, fs.getMultipartSHADir(bucket, object))
	}

	fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
	if err != nil {
		return oi, toObjectErr(err, bucket, object)
	}

	return fsMeta.ToObjectInfo(bucket, object, fi), nil
}

// AbortMultipartUpload - aborts an ongoing multipart operation
// signified by the input uploadID. This is an atomic operation
// doesn't require clients to initiate multiple such requests.
//
// All parts are purged from all disks and reference to the uploadID
// would be removed from the system, rollback is not possible on this
// operation.
//
// Implements S3 compatible Abort multipart API, slight difference is
// that this is an atomic idempotent operation. Subsequent calls have
// no affect and further requests to the same uploadID would not be
// honored.
func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
	if err := checkAbortMultipartArgs(ctx, bucket, object, fs); err != nil {
		return err
	}

	if _, err := fs.statBucketDir(ctx, bucket); err != nil {
		return toObjectErr(err, bucket)
	}

	fs.appendFileMapMu.Lock()
	// Remove file in tmp folder
	file := fs.appendFileMap[uploadID]
	if file != nil {
		fsRemoveFile(ctx, file.filePath)
	}
	delete(fs.appendFileMap, uploadID)
	fs.appendFileMapMu.Unlock()

	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
	// Just check if the uploadID exists to avoid copy if it doesn't.
	_, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
	if err != nil {
		if err == errFileNotFound || err == errFileAccessDenied {
			return InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
		}
		return toObjectErr(err, bucket, object)
	}

	// Purge multipart folders
	{
		fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, mustGetUUID())
		defer fsRemoveAll(ctx, fsTmpObjPath) // remove multipart temporary files in background.

		Rename(uploadIDDir, fsTmpObjPath)

		// It is safe to ignore any directory not empty error (in case there were multiple uploadIDs on the same object)
		fsRemoveDir(ctx, fs.getMultipartSHADir(bucket, object))
	}

	return nil
}

// Return all uploads IDs with full path of each upload-id directory.
// Do not return an error as this is a lazy operation
func (fs *FSObjects) getAllUploadIDs(ctx context.Context) (result map[string]string) {
	result = make(map[string]string)

	entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket))
	if err != nil {
		return
	}
	for _, entry := range entries {
		uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))
		if err != nil {
			continue
		}
		// Remove the trailing slash separator
		for i := range uploadIDs {
			uploadID := strings.TrimSuffix(uploadIDs[i], SlashSeparator)
			result[uploadID] = pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID)
		}
	}
	return
}

// Removes multipart uploads if any older than `expiry` duration
// on all buckets for every `cleanupInterval`, this function is
// blocking and should be run in a go-routine.
func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) {
	expiryUploadsTimer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval())
	defer expiryUploadsTimer.Stop()

	bgAppendTmpCleaner := time.NewTimer(bgAppendsCleanupInterval)
	defer bgAppendTmpCleaner.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-bgAppendTmpCleaner.C:
			foundUploadIDs := fs.getAllUploadIDs(ctx)

			// Remove background append map from the memory
			fs.appendFileMapMu.Lock()
			for uploadID := range fs.appendFileMap {
				_, ok := foundUploadIDs[uploadID]
				if !ok {
					delete(fs.appendFileMap, uploadID)
				}
			}
			fs.appendFileMapMu.Unlock()

			// Remove background appends file from the disk
			bgAppendsDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, bgAppendsDirName)
			entries, err := readDir(bgAppendsDir)
			if err != nil {
				break
			}
			for _, entry := range entries {
				uploadID := strings.Split(entry, ".")[0]
				_, ok := foundUploadIDs[uploadID]
				if !ok {
					fsRemoveFile(ctx, pathJoin(bgAppendsDir, entry))
				}
			}

			bgAppendTmpCleaner.Reset(bgAppendsCleanupInterval)
		case <-expiryUploadsTimer.C:
			expiry := globalAPIConfig.getStaleUploadsExpiry()
			now := time.Now()

			uploadIDs := fs.getAllUploadIDs(ctx)

			for uploadID, path := range uploadIDs {
				fi, err := fsStatDir(ctx, path)
				if err != nil {
					continue
				}
				if now.Sub(fi.ModTime()) > expiry {
					fsRemoveAll(ctx, path)
					// Remove upload ID parent directory if empty
					fsRemoveDir(ctx, filepath.Base(path))

					// Remove uploadID from the append file map and its corresponding temporary file
					fs.appendFileMapMu.Lock()
					bgAppend, ok := fs.appendFileMap[uploadID]
					if ok {
						_ = fsRemoveFile(ctx, bgAppend.filePath)
						delete(fs.appendFileMap, uploadID)
					}
					fs.appendFileMapMu.Unlock()
				}
			}

			// Reset for the next interval
			expiryUploadsTimer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
		}
	}
}