/*
 * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cmd

import (
	"encoding/hex"
	"encoding/json"
	"fmt"
	"io"
	"io/ioutil"
	"os"
	pathutil "path"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/minio/minio/pkg/errors"
	mioutil "github.com/minio/minio/pkg/ioutil"

	"github.com/minio/minio/pkg/hash"
)

// Returns EXPORT/.minio.sys/multipart/SHA256/UPLOADID
func (fs *fsObjects) getUploadIDDir(bucket, object, uploadID string) string {
	return pathJoin(fs.fsPath, minioMetaMultipartBucket, getSHA256Hash([]byte(pathJoin(bucket, object))), uploadID)
}

// Returns EXPORT/.minio.sys/multipart/SHA256
func (fs *fsObjects) getMultipartSHADir(bucket, object string) string {
	return pathJoin(fs.fsPath, minioMetaMultipartBucket, getSHA256Hash([]byte(pathJoin(bucket, object))))
}

// Returns partNumber.etag
func (fs *fsObjects) encodePartFile(partNumber int, etag string) string {
	return fmt.Sprintf("%.5d.%s", partNumber, etag)
}

// Returns partNumber and etag
func (fs *fsObjects) decodePartFile(name string) (partNumber int, etag string, err error) {
	result := strings.Split(name, ".")
	if len(result) != 2 {
		return 0, "", errUnexpected
	}
	partNumber, err = strconv.Atoi(result[0])
	if err != nil {
		return 0, "", errUnexpected
	}
	return partNumber, result[1], nil
}

// Appends parts to an appendFile sequentially.
func (fs *fsObjects) backgroundAppend(bucket, object, uploadID string) {
	fs.appendFileMapMu.Lock()
	file := fs.appendFileMap[uploadID]
	if file == nil {
		file = &fsAppendFile{
			filePath: pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())),
		}
		fs.appendFileMap[uploadID] = file
	}
	fs.appendFileMapMu.Unlock()

	file.Lock()
	defer file.Unlock()

	// Since we append sequentially nextPartNumber will always be len(file.parts)+1
	nextPartNumber := len(file.parts) + 1
	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)

	entries, err := readDir(uploadIDDir)
	if err != nil {
		errorIf(err, "error reading directory %s", uploadIDDir)
		return
	}
	sort.Strings(entries)

	for _, entry := range entries {
		if entry == fsMetaJSONFile {
			continue
		}
		partNumber, etag, err := fs.decodePartFile(entry)
		if err != nil {
			errorIf(err, "unable to split the file name into partNumber and etag: %s", entry)
			return
		}
		if partNumber < nextPartNumber {
			// Part already appended.
			continue
		}
		if partNumber > nextPartNumber {
			// Required part number is not yet uploaded.
			return
		}

		partPath := pathJoin(uploadIDDir, entry)
		err = mioutil.AppendFile(file.filePath, partPath)
		if err != nil {
			errorIf(err, "Unable to append %s to %s", partPath, file.filePath)
			return
		}

		file.parts = append(file.parts, PartInfo{PartNumber: nextPartNumber, ETag: etag})
		nextPartNumber++
	}
}

// ListMultipartUploads - lists all the uploadIDs for the specified object.
// We do not support prefix based listing.
func (fs *fsObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, e error) {
	if err := checkListMultipartArgs(bucket, object, keyMarker, uploadIDMarker, delimiter, fs); err != nil {
		return result, toObjectErr(errors.Trace(err))
	}

	if _, err := fs.statBucketDir(bucket); err != nil {
		return result, toObjectErr(errors.Trace(err), bucket)
	}

	result.MaxUploads = maxUploads
	result.KeyMarker = keyMarker
	result.Prefix = object
	result.Delimiter = delimiter
	result.NextKeyMarker = object
	result.UploadIDMarker = uploadIDMarker

	uploadIDs, err := readDir(fs.getMultipartSHADir(bucket, object))
	if err != nil {
		if err == errFileNotFound {
			result.IsTruncated = false
			return result, nil
		}
		return result, toObjectErr(errors.Trace(err))
	}

	// S3 spec says uploaIDs should be sorted based on initiated time. ModTime of fs.json
	// is the creation time of the uploadID, hence we will use that.
	var uploads []MultipartInfo
	for _, uploadID := range uploadIDs {
		metaFilePath := pathJoin(fs.getMultipartSHADir(bucket, object), uploadID, fsMetaJSONFile)
		fi, err := fsStatFile(metaFilePath)
		if err != nil {
			return result, toObjectErr(err, bucket, object)
		}
		uploads = append(uploads, MultipartInfo{
			Object:    object,
			UploadID:  strings.TrimSuffix(uploadID, slashSeparator),
			Initiated: fi.ModTime(),
		})
	}
	sort.Slice(uploads, func(i int, j int) bool {
		return uploads[i].Initiated.Before(uploads[j].Initiated)
	})

	uploadIndex := 0
	if uploadIDMarker != "" {
		for uploadIndex < len(uploads) {
			if uploads[uploadIndex].UploadID != uploadIDMarker {
				uploadIndex++
				continue
			}
			if uploads[uploadIndex].UploadID == uploadIDMarker {
				uploadIndex++
				break
			}
			uploadIndex++
		}
	}
	for uploadIndex < len(uploads) {
		result.Uploads = append(result.Uploads, uploads[uploadIndex])
		result.NextUploadIDMarker = uploads[uploadIndex].UploadID
		uploadIndex++
		if len(result.Uploads) == maxUploads {
			break
		}
	}

	result.IsTruncated = uploadIndex < len(uploads)

	if !result.IsTruncated {
		result.NextKeyMarker = ""
		result.NextUploadIDMarker = ""
	}

	return result, nil
}

// NewMultipartUpload - initialize a new multipart upload, returns a
// unique id. The unique id returned here is of UUID form, for each
// subsequent request each UUID is unique.
//
// Implements S3 compatible initiate multipart API.
func (fs *fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
	if err := checkNewMultipartArgs(bucket, object, fs); err != nil {
		return "", toObjectErr(err, bucket)
	}

	if _, err := fs.statBucketDir(bucket); err != nil {
		return "", toObjectErr(err, bucket)
	}

	uploadID := mustGetUUID()
	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)

	err := mkdirAll(uploadIDDir, 0755)
	if err != nil {
		return "", errors.Trace(err)
	}

	// Initialize fs.json values.
	fsMeta := newFSMetaV1()
	fsMeta.Meta = meta

	fsMetaBytes, err := json.Marshal(fsMeta)
	if err != nil {
		return "", errors.Trace(err)
	}

	if err = ioutil.WriteFile(pathJoin(uploadIDDir, fsMetaJSONFile), fsMetaBytes, 0644); err != nil {
		return "", errors.Trace(err)
	}
	return uploadID, nil
}

// CopyObjectPart - similar to PutObjectPart but reads data from an existing
// object. Internally incoming data is written to '.minio.sys/tmp' location
// and safely renamed to '.minio.sys/multipart' for reach parts.
func (fs *fsObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int,
	startOffset int64, length int64, metadata map[string]string, srcEtag string) (pi PartInfo, e error) {

	if err := checkNewMultipartArgs(srcBucket, srcObject, fs); err != nil {
		return pi, toObjectErr(errors.Trace(err))
	}

	// Initialize pipe.
	pipeReader, pipeWriter := io.Pipe()

	go func() {
		if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcEtag); gerr != nil {
			errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject)
			pipeWriter.CloseWithError(gerr)
			return
		}
		pipeWriter.Close() // Close writer explicitly signalling we wrote all data.
	}()

	hashReader, err := hash.NewReader(pipeReader, length, "", "")
	if err != nil {
		return pi, toObjectErr(err, dstBucket, dstObject)
	}

	partInfo, err := fs.PutObjectPart(dstBucket, dstObject, uploadID, partID, hashReader)
	if err != nil {
		return pi, toObjectErr(err, dstBucket, dstObject)
	}

	// Explicitly close the reader.
	pipeReader.Close()

	return partInfo, nil
}

// PutObjectPart - reads incoming data until EOF for the part file on
// an ongoing multipart transaction. Internally incoming data is
// written to '.minio.sys/tmp' location and safely renamed to
// '.minio.sys/multipart' for reach parts.
func (fs *fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, data *hash.Reader) (pi PartInfo, e error) {
	if err := checkPutObjectPartArgs(bucket, object, fs); err != nil {
		return pi, toObjectErr(errors.Trace(err), bucket)
	}

	if _, err := fs.statBucketDir(bucket); err != nil {
		return pi, toObjectErr(errors.Trace(err), bucket)
	}

	// Validate input data size and it can never be less than zero.
	if data.Size() < 0 {
		return pi, toObjectErr(errors.Trace(errInvalidArgument))
	}

	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)

	// Just check if the uploadID exists to avoid copy if it doesn't.
	_, err := fsStatFile(pathJoin(uploadIDDir, fsMetaJSONFile))
	if err != nil {
		if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied {
			return pi, errors.Trace(InvalidUploadID{UploadID: uploadID})
		}
		return pi, toObjectErr(err, bucket, object)
	}

	bufSize := int64(readSizeV1)
	if size := data.Size(); size > 0 && bufSize > size {
		bufSize = size
	}
	buf := make([]byte, bufSize)

	tmpPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID+"."+mustGetUUID()+"."+strconv.Itoa(partID))
	bytesWritten, err := fsCreateFile(tmpPartPath, data, buf, data.Size())
	if err != nil {
		fsRemoveFile(tmpPartPath)
		return pi, toObjectErr(err, minioMetaTmpBucket, tmpPartPath)
	}

	// Should return IncompleteBody{} error when reader has fewer
	// bytes than specified in request header.
	if bytesWritten < data.Size() {
		fsRemoveFile(tmpPartPath)
		return pi, errors.Trace(IncompleteBody{})
	}

	// Delete temporary part in case of failure. If
	// PutObjectPart succeeds then there would be nothing to
	// delete in which case we just ignore the error.
	defer fsRemoveFile(tmpPartPath)

	etag := hex.EncodeToString(data.MD5Current())
	if etag == "" {
		etag = GenETag()
	}
	partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag))

	if err = fsRenameFile(tmpPartPath, partPath); err != nil {
		return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
	}

	go fs.backgroundAppend(bucket, object, uploadID)

	fi, err := fsStatFile(partPath)
	if err != nil {
		return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
	}
	return PartInfo{
		PartNumber:   partID,
		LastModified: fi.ModTime(),
		ETag:         etag,
		Size:         fi.Size(),
	}, nil
}

// ListObjectParts - lists all previously uploaded parts for a given
// object and uploadID.  Takes additional input of part-number-marker
// to indicate where the listing should begin from.
//
// Implements S3 compatible ListObjectParts API. The resulting
// ListPartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (fs *fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (result ListPartsInfo, e error) {
	if err := checkListPartsArgs(bucket, object, fs); err != nil {
		return result, toObjectErr(errors.Trace(err))
	}
	result.Bucket = bucket
	result.Object = object
	result.UploadID = uploadID
	result.MaxParts = maxParts
	result.PartNumberMarker = partNumberMarker

	// Check if bucket exists
	if _, err := fs.statBucketDir(bucket); err != nil {
		return result, toObjectErr(errors.Trace(err), bucket)
	}

	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
	_, err := fsStatFile(pathJoin(uploadIDDir, fsMetaJSONFile))
	if err != nil {
		if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied {
			return result, errors.Trace(InvalidUploadID{UploadID: uploadID})
		}
		return result, toObjectErr(errors.Trace(err), bucket, object)
	}

	entries, err := readDir(uploadIDDir)
	if err != nil {
		return result, toObjectErr(errors.Trace(err), bucket)
	}

	partsMap := make(map[int]string)
	for _, entry := range entries {
		if entry == fsMetaJSONFile {
			continue
		}
		partNumber, etag1, err := fs.decodePartFile(entry)
		if err != nil {
			return result, toObjectErr(errors.Trace(err))
		}
		etag2, ok := partsMap[partNumber]
		if !ok {
			partsMap[partNumber] = etag1
			continue
		}
		stat1, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag1)))
		if err != nil {
			return result, toObjectErr(errors.Trace(err))
		}
		stat2, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag2)))
		if err != nil {
			return result, toObjectErr(errors.Trace(err))
		}
		if stat1.ModTime().After(stat2.ModTime()) {
			partsMap[partNumber] = etag1
		}
	}
	var parts []PartInfo
	for partNumber, etag := range partsMap {
		parts = append(parts, PartInfo{PartNumber: partNumber, ETag: etag})
	}
	sort.SliceStable(parts, func(i int, j int) bool {
		return parts[i].PartNumber < parts[j].PartNumber
	})
	i := 0
	if partNumberMarker != 0 {
		// If the marker was set, skip the entries till the marker.
		for _, part := range parts {
			i++
			if part.PartNumber == partNumberMarker {
				break
			}
		}
	}

	partsCount := 0
	for partsCount < maxParts && i < len(parts) {
		result.Parts = append(result.Parts, parts[i])
		i++
		partsCount++
	}
	if i < len(parts) {
		result.IsTruncated = true
		if partsCount != 0 {
			result.NextPartNumberMarker = result.Parts[partsCount-1].PartNumber
		}
	}
	for i, part := range result.Parts {
		stat, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag)))
		if err != nil {
			return result, toObjectErr(errors.Trace(err))
		}
		result.Parts[i].LastModified = stat.ModTime()
		result.Parts[i].Size = stat.Size()
	}
	return result, nil
}

// CompleteMultipartUpload - completes an ongoing multipart
// transaction after receiving all the parts indicated by the client.
// Returns an md5sum calculated by concatenating all the individual
// md5sums of all the parts.
//
// Implements S3 compatible Complete multipart API.
func (fs *fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []CompletePart) (oi ObjectInfo, e error) {
	if err := checkCompleteMultipartArgs(bucket, object, fs); err != nil {
		return oi, toObjectErr(err)
	}

	// Check if an object is present as one of the parent dir.
	if fs.parentDirIsObject(bucket, pathutil.Dir(object)) {
		return oi, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object)
	}

	if _, err := fs.statBucketDir(bucket); err != nil {
		return oi, toObjectErr(err, bucket)
	}

	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
	// Just check if the uploadID exists to avoid copy if it doesn't.
	_, err := fsStatFile(pathJoin(uploadIDDir, fsMetaJSONFile))
	if err != nil {
		if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied {
			return oi, errors.Trace(InvalidUploadID{UploadID: uploadID})
		}
		return oi, toObjectErr(err, bucket, object)
	}

	// Calculate s3 compatible md5sum for complete multipart.
	s3MD5, err := getCompleteMultipartMD5(parts)
	if err != nil {
		return oi, err
	}

	partSize := int64(-1) // Used later to ensure that all parts sizes are same.

	// Validate all parts and then commit to disk.
	for i, part := range parts {
		partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag))
		var fi os.FileInfo
		fi, err = fsStatFile(partPath)
		if err != nil {
			if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied {
				return oi, errors.Trace(InvalidPart{})
			}
			return oi, errors.Trace(err)
		}
		if partSize == -1 {
			partSize = fi.Size()
		}
		if i == len(parts)-1 {
			break
		}

		// All parts except the last part has to be atleast 5MB.
		if !isMinAllowedPartSize(fi.Size()) {
			return oi, errors.Trace(PartTooSmall{
				PartNumber: part.PartNumber,
				PartSize:   fi.Size(),
				PartETag:   part.ETag,
			})
		}

		// TODO: Make necessary changes in future as explained in the below comment.
		// All parts except the last part has to be of same size. We are introducing this
		// check to see if any clients break. If clients do not break then we can optimize
		// multipart PutObjectPart by writing the part at the right offset using pwrite()
		// so that we don't need to do background append at all. i.e by the time we get
		// CompleteMultipartUpload we already have the full file available which can be
		// renamed to the main name-space.
		if partSize != fi.Size() {
			return oi, errors.Trace(PartsSizeUnequal{})
		}
	}

	appendFallback := true // In case background-append did not append the required parts.
	appendFilePath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID()))

	// Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend()
	// to take care of the following corner case:
	// 1. The last PutObjectPart triggers go-routine fs.backgroundAppend, this go-routine has not started yet.
	// 2. Now CompleteMultipartUpload gets called which sees that lastPart is not appended and starts appending
	//    from the beginning
	fs.backgroundAppend(bucket, object, uploadID)

	fs.appendFileMapMu.Lock()
	file := fs.appendFileMap[uploadID]
	delete(fs.appendFileMap, uploadID)
	fs.appendFileMapMu.Unlock()

	if file != nil {
		file.Lock()
		defer file.Unlock()
		// Verify that appendFile has all the parts.
		if len(file.parts) == len(parts) {
			for i := range parts {
				if parts[i].ETag != file.parts[i].ETag {
					break
				}
				if parts[i].PartNumber != file.parts[i].PartNumber {
					break
				}
				if i == len(parts)-1 {
					appendFilePath = file.filePath
					appendFallback = false
				}
			}
		}
	}

	if appendFallback {
		fsRemoveFile(file.filePath)
		for _, part := range parts {
			partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag))
			err = mioutil.AppendFile(appendFilePath, partPath)
			if err != nil {
				return oi, toObjectErr(errors.Trace(err))
			}
		}
	}

	// Hold write lock on the object.
	destLock := fs.nsMutex.NewNSLock(bucket, object)
	if err = destLock.GetLock(globalObjectTimeout); err != nil {
		return oi, err
	}
	defer destLock.Unlock()
	fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile)
	metaFile, err := fs.rwPool.Create(fsMetaPath)
	if err != nil {
		return oi, toObjectErr(errors.Trace(err), bucket, object)
	}
	defer metaFile.Close()

	fsMeta := fsMetaV1{}
	// Read saved fs metadata for ongoing multipart.
	fsMetaBuf, err := ioutil.ReadFile(pathJoin(uploadIDDir, fsMetaJSONFile))
	if err != nil {
		return oi, toObjectErr(errors.Trace(err), bucket, object)
	}
	err = json.Unmarshal(fsMetaBuf, &fsMeta)
	if err != nil {
		return oi, toObjectErr(errors.Trace(err), bucket, object)
	}
	// Save additional metadata.
	if len(fsMeta.Meta) == 0 {
		fsMeta.Meta = make(map[string]string)
	}
	fsMeta.Meta["etag"] = s3MD5
	if _, err = fsMeta.WriteTo(metaFile); err != nil {
		return oi, toObjectErr(errors.Trace(err), bucket, object)
	}

	err = fsRenameFile(appendFilePath, pathJoin(fs.fsPath, bucket, object))
	if err != nil {
		return oi, toObjectErr(errors.Trace(err), bucket, object)
	}
	fsRemoveAll(uploadIDDir)
	fi, err := fsStatFile(pathJoin(fs.fsPath, bucket, object))
	if err != nil {
		return oi, toObjectErr(errors.Trace(err), bucket, object)
	}

	return fsMeta.ToObjectInfo(bucket, object, fi), nil
}

// AbortMultipartUpload - aborts an ongoing multipart operation
// signified by the input uploadID. This is an atomic operation
// doesn't require clients to initiate multiple such requests.
//
// All parts are purged from all disks and reference to the uploadID
// would be removed from the system, rollback is not possible on this
// operation.
//
// Implements S3 compatible Abort multipart API, slight difference is
// that this is an atomic idempotent operation. Subsequent calls have
// no affect and further requests to the same uploadID would not be
// honored.
func (fs *fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
	if err := checkAbortMultipartArgs(bucket, object, fs); err != nil {
		return err
	}

	if _, err := fs.statBucketDir(bucket); err != nil {
		return toObjectErr(errors.Trace(err), bucket)
	}

	fs.appendFileMapMu.Lock()
	delete(fs.appendFileMap, uploadID)
	fs.appendFileMapMu.Unlock()

	uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
	// Just check if the uploadID exists to avoid copy if it doesn't.
	_, err := fsStatFile(pathJoin(uploadIDDir, fsMetaJSONFile))
	if err != nil {
		if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied {
			return errors.Trace(InvalidUploadID{UploadID: uploadID})
		}
		return toObjectErr(errors.Trace(err), bucket, object)
	}
	// Ignore the error returned as Windows fails to remove directory if a file in it
	// is Open()ed by the backgroundAppend()
	fsRemoveAll(uploadIDDir)

	return nil
}

// Removes multipart uploads if any older than `expiry` duration
// on all buckets for every `cleanupInterval`, this function is
// blocking and should be run in a go-routine.
func (fs *fsObjects) cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, doneCh chan struct{}) {
	ticker := time.NewTicker(cleanupInterval)
	for {
		select {
		case <-doneCh:
			// Stop the timer.
			ticker.Stop()
			return
		case <-ticker.C:
			now := time.Now()
			entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket))
			if err != nil {
				continue
			}
			for _, entry := range entries {
				uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))
				if err != nil {
					continue
				}
				for _, uploadID := range uploadIDs {
					fi, err := fsStatDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))
					if err != nil {
						continue
					}
					if now.Sub(fi.ModTime()) > expiry {
						fsRemoveAll(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))
					}
				}
			}
		}
	}
}