// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
	"context"
	"encoding/base64"
	"errors"
	"fmt"
	"io"
	"os"
	"path"
	"sort"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/klauspost/readahead"
	"github.com/minio/minio-go/v7/pkg/set"
	"github.com/minio/minio/internal/config/storageclass"
	"github.com/minio/minio/internal/crypto"
	"github.com/minio/minio/internal/hash"
	xhttp "github.com/minio/minio/internal/http"
	xioutil "github.com/minio/minio/internal/ioutil"
	"github.com/minio/minio/internal/logger"
	"github.com/minio/pkg/v3/mimedb"
	"github.com/minio/pkg/v3/sync/errgroup"
)

func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string {
	uploadUUID := uploadID
	uploadBytes, err := base64.RawURLEncoding.DecodeString(uploadID)
	if err == nil {
		slc := strings.SplitN(string(uploadBytes), ".", 2)
		if len(slc) == 2 {
			uploadUUID = slc[1]
		}
	}
	return pathJoin(er.getMultipartSHADir(bucket, object), uploadUUID)
}

func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
	return getSHA256Hash([]byte(pathJoin(bucket, object)))
}

// checkUploadIDExists - verify if a given uploadID exists and is valid.
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string, write bool) (fi FileInfo, metArr []FileInfo, err error) {
	defer func() {
		if errors.Is(err, errFileNotFound) {
			err = errUploadIDNotFound
		}
	}()

	uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)

	storageDisks := er.getDisks()

	// Read metadata associated with the object from all disks.
	partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, minioMetaMultipartBucket,
		uploadIDPath, "", false, false)

	readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
	if err != nil {
		return fi, nil, err
	}

	if readQuorum < 0 {
		return fi, nil, errErasureReadQuorum
	}

	if writeQuorum < 0 {
		return fi, nil, errErasureWriteQuorum
	}

	quorum := readQuorum
	if write {
		quorum = writeQuorum
	}

	// List all online disks.
	_, modTime, etag := listOnlineDisks(storageDisks, partsMetadata, errs, quorum)

	if write {
		err = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
	} else {
		err = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
	}
	if err != nil {
		return fi, nil, err
	}

	// Pick one from the first valid metadata.
	fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, etag, quorum)
	return fi, partsMetadata, err
}

// cleanupMultipartPath removes all extraneous files and parts from the multipart folder, this is used per CompleteMultipart.
// do not use this function outside of completeMultipartUpload()
func (er erasureObjects) cleanupMultipartPath(ctx context.Context, paths ...string) {
	storageDisks := er.getDisks()

	g := errgroup.WithNErrs(len(storageDisks))
	for index, disk := range storageDisks {
		if disk == nil {
			continue
		}
		index := index
		g.Go(func() error {
			_ = storageDisks[index].DeleteBulk(ctx, minioMetaMultipartBucket, paths...)
			return nil
		}, index)
	}
	g.Wait()
}

// Clean-up the old multipart uploads. Should be run in a Go routine.
func (er erasureObjects) cleanupStaleUploads(ctx context.Context) {
	// run multiple cleanup's local to this server.
	var wg sync.WaitGroup
	for _, disk := range er.getLocalDisks() {
		if disk != nil {
			wg.Add(1)
			go func(disk StorageAPI) {
				defer wg.Done()
				er.cleanupStaleUploadsOnDisk(ctx, disk)
			}(disk)
		}
	}
	wg.Wait()
}

func (er erasureObjects) deleteAll(ctx context.Context, bucket, prefix string) {
	var wg sync.WaitGroup
	for _, disk := range er.getDisks() {
		if disk == nil {
			continue
		}
		wg.Add(1)
		go func(disk StorageAPI) {
			defer wg.Done()
			disk.Delete(ctx, bucket, prefix, DeleteOptions{
				Recursive: true,
				Immediate: false,
			})
		}(disk)
	}
	wg.Wait()
}

// Remove the old multipart uploads on the given disk.
func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI) {
	drivePath := disk.Endpoint().Path

	readDirFn(pathJoin(drivePath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
		readDirFn(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
			uploadIDPath := pathJoin(shaDir, uploadIDDir)
			var modTime time.Time
			// Upload IDs are of the form base64_url(<UUID>x<UnixNano>), we can extract the time from the UUID.
			if b64, err := base64.RawURLEncoding.DecodeString(uploadIDDir); err == nil {
				if split := strings.Split(string(b64), "x"); len(split) == 2 {
					t, err := strconv.ParseInt(split[1], 10, 64)
					if err == nil {
						modTime = time.Unix(0, t)
					}
				}
			}
			// Fallback for older uploads without time in the ID.
			if modTime.IsZero() {
				wait := deleteMultipartCleanupSleeper.Timer(ctx)
				fi, err := disk.ReadVersion(ctx, "", minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{})
				if err != nil {
					return nil
				}
				modTime = fi.ModTime
				wait()
			}
			if time.Since(modTime) < globalAPIConfig.getStaleUploadsExpiry() {
				return nil
			}
			w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
			return w.Run(func() error {
				wait := deleteMultipartCleanupSleeper.Timer(ctx)
				pathUUID := mustGetUUID()
				targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
				renameAll(pathJoin(drivePath, minioMetaMultipartBucket, uploadIDPath), targetPath, pathJoin(drivePath, minioMetaBucket))
				wait()
				return nil
			})
		})
		// Get the modtime of the shaDir.
		vi, err := disk.StatVol(ctx, pathJoin(minioMetaMultipartBucket, shaDir))
		if err != nil {
			return nil
		}
		// Modtime is returned in the Created field. See (*xlStorage).StatVol
		if time.Since(vi.Created) < globalAPIConfig.getStaleUploadsExpiry() {
			return nil
		}
		w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
		return w.Run(func() error {
			wait := deleteMultipartCleanupSleeper.Timer(ctx)
			pathUUID := mustGetUUID()
			targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)

			// We are not deleting shaDir recursively here, if shaDir is empty
			// and its older then we can happily delete it.
			Rename(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), targetPath)
			wait()
			return nil
		})
	})

	readDirFn(pathJoin(drivePath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error {
		if strings.HasPrefix(tmpDir, ".trash") {
			// do not remove .trash/ here, it has its own routines
			return nil
		}
		vi, err := disk.StatVol(ctx, pathJoin(minioMetaTmpBucket, tmpDir))
		if err != nil {
			return nil
		}
		w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
		return w.Run(func() error {
			wait := deleteMultipartCleanupSleeper.Timer(ctx)
			if time.Since(vi.Created) > globalAPIConfig.getStaleUploadsExpiry() {
				pathUUID := mustGetUUID()
				targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)

				renameAll(pathJoin(drivePath, minioMetaTmpBucket, tmpDir), targetPath, pathJoin(drivePath, minioMetaBucket))
			}
			wait()
			return nil
		})
	})
}

// ListMultipartUploads - lists all the pending multipart
// uploads for a particular object in a bucket.
//
// Implements minimal S3 compatible ListMultipartUploads API. We do
// not support prefix based listing, this is a deliberate attempt
// towards simplification of multipart APIs.
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
	auditObjectErasureSet(ctx, "ListMultipartUploads", object, &er)

	result.MaxUploads = maxUploads
	result.KeyMarker = keyMarker
	result.Prefix = object
	result.Delimiter = delimiter

	var uploadIDs []string
	var disk StorageAPI
	disks := er.getOnlineLocalDisks()
	if len(disks) == 0 {
		// If no local, get non-healing disks.
		var ok bool
		if disks, ok = er.getOnlineDisksWithHealing(false); !ok {
			disks = er.getOnlineDisks()
		}
	}

	for _, disk = range disks {
		if disk == nil {
			continue
		}
		if !disk.IsOnline() {
			continue
		}
		uploadIDs, err = disk.ListDir(ctx, bucket, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
		if err != nil {
			if errors.Is(err, errDiskNotFound) {
				continue
			}
			if errors.Is(err, errFileNotFound) {
				return result, nil
			}
			return result, toObjectErr(err, bucket, object)
		}
		break
	}

	for i := range uploadIDs {
		uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
	}

	// S3 spec says uploadIDs should be sorted based on initiated time, we need
	// to read the metadata entry.
	var uploads []MultipartInfo

	populatedUploadIDs := set.NewStringSet()

	for _, uploadID := range uploadIDs {
		if populatedUploadIDs.Contains(uploadID) {
			continue
		}
		// If present, use time stored in ID.
		startTime := time.Now()
		if split := strings.Split(uploadID, "x"); len(split) == 2 {
			t, err := strconv.ParseInt(split[1], 10, 64)
			if err == nil {
				startTime = time.Unix(0, t)
			}
		}
		uploads = append(uploads, MultipartInfo{
			Bucket:    bucket,
			Object:    object,
			UploadID:  base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID(), uploadID))),
			Initiated: startTime,
		})
		populatedUploadIDs.Add(uploadID)
	}

	sort.Slice(uploads, func(i int, j int) bool {
		return uploads[i].Initiated.Before(uploads[j].Initiated)
	})

	uploadIndex := 0
	if uploadIDMarker != "" {
		for uploadIndex < len(uploads) {
			if uploads[uploadIndex].UploadID != uploadIDMarker {
				uploadIndex++
				continue
			}
			if uploads[uploadIndex].UploadID == uploadIDMarker {
				uploadIndex++
				break
			}
			uploadIndex++
		}
	}
	for uploadIndex < len(uploads) {
		result.Uploads = append(result.Uploads, uploads[uploadIndex])
		result.NextUploadIDMarker = uploads[uploadIndex].UploadID
		uploadIndex++
		if len(result.Uploads) == maxUploads {
			break
		}
	}

	result.IsTruncated = uploadIndex < len(uploads)

	if !result.IsTruncated {
		result.NextKeyMarker = ""
		result.NextUploadIDMarker = ""
	}

	return result, nil
}

// newMultipartUpload - wrapper for initializing a new multipart
// request; returns a unique upload id.
//
// Internally this function creates 'uploads.json' associated for the
// incoming object at
// '.minio.sys/multipart/bucket/object/uploads.json' on all the
// disks. `uploads.json` carries metadata regarding on-going multipart
// operation(s) on the object.
func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) {
	if opts.CheckPrecondFn != nil {
		if !opts.NoLock {
			ns := er.NewNSLock(bucket, object)
			lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
			if err != nil {
				return nil, err
			}
			ctx = lkctx.Context()
			defer ns.Unlock(lkctx)
			opts.NoLock = true
		}

		obj, err := er.getObjectInfo(ctx, bucket, object, opts)
		if err == nil && opts.CheckPrecondFn(obj) {
			return nil, PreConditionFailed{}
		}
		if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) && !isErrReadQuorum(err) {
			return nil, err
		}
	}

	userDefined := cloneMSS(opts.UserDefined)
	if opts.PreserveETag != "" {
		userDefined["etag"] = opts.PreserveETag
	}
	onlineDisks := er.getDisks()

	// Get parity and data drive count based on storage class metadata
	parityDrives := globalStorageClass.GetParityForSC(userDefined[xhttp.AmzStorageClass])
	if parityDrives < 0 {
		parityDrives = er.defaultParityCount
	}

	if globalStorageClass.AvailabilityOptimized() {
		// If we have offline disks upgrade the number of erasure codes for this object.
		parityOrig := parityDrives

		var offlineDrives int
		for _, disk := range onlineDisks {
			if disk == nil || !disk.IsOnline() {
				parityDrives++
				offlineDrives++
				continue
			}
		}

		if offlineDrives >= (len(onlineDisks)+1)/2 {
			// if offline drives are more than 50% of the drives
			// we have no quorum, we shouldn't proceed just
			// fail at that point.
			return nil, toObjectErr(errErasureWriteQuorum, bucket, object)
		}

		if parityDrives >= len(onlineDisks)/2 {
			parityDrives = len(onlineDisks) / 2
		}

		if parityOrig != parityDrives {
			userDefined[minIOErasureUpgraded] = strconv.Itoa(parityOrig) + "->" + strconv.Itoa(parityDrives)
		}
	}

	dataDrives := len(onlineDisks) - parityDrives

	// we now know the number of blocks this object needs for data and parity.
	// establish the writeQuorum using this data
	writeQuorum := dataDrives
	if dataDrives == parityDrives {
		writeQuorum++
	}

	// Initialize parts metadata
	partsMetadata := make([]FileInfo, len(onlineDisks))

	fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives)
	fi.VersionID = opts.VersionID
	if opts.Versioned && fi.VersionID == "" {
		fi.VersionID = mustGetUUID()
	}
	fi.DataDir = mustGetUUID()

	if userDefined[ReplicationSsecChecksumHeader] != "" {
		fi.Checksum, _ = base64.StdEncoding.DecodeString(userDefined[ReplicationSsecChecksumHeader])
		delete(userDefined, ReplicationSsecChecksumHeader)
	}

	// Initialize erasure metadata.
	for index := range partsMetadata {
		partsMetadata[index] = fi
	}

	// Guess content-type from the extension if possible.
	if userDefined["content-type"] == "" {
		userDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object))
	}

	// if storageClass is standard no need to save it as part of metadata.
	if userDefined[xhttp.AmzStorageClass] == storageclass.STANDARD {
		delete(userDefined, xhttp.AmzStorageClass)
	}

	if opts.WantChecksum != nil && opts.WantChecksum.Type.IsSet() {
		userDefined[hash.MinIOMultipartChecksum] = opts.WantChecksum.Type.String()
	}

	modTime := opts.MTime
	if opts.MTime.IsZero() {
		modTime = UTCNow()
	}

	onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi)

	// Fill all the necessary metadata.
	// Update `xl.meta` content on each disks.
	for index := range partsMetadata {
		partsMetadata[index].Fresh = true
		partsMetadata[index].ModTime = modTime
		partsMetadata[index].Metadata = userDefined
	}
	uploadUUID := fmt.Sprintf("%sx%d", mustGetUUID(), modTime.UnixNano())
	uploadID := base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID(), uploadUUID)))
	uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID)

	// Write updated `xl.meta` to all disks.
	if _, err := writeAllMetadata(ctx, onlineDisks, bucket, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
		return nil, toObjectErr(err, bucket, object)
	}

	return &NewMultipartUploadResult{
		UploadID:     uploadID,
		ChecksumAlgo: userDefined[hash.MinIOMultipartChecksum],
	}, nil
}

// NewMultipartUpload - initialize a new multipart upload, returns a
// unique id. The unique id returned here is of UUID form, for each
// subsequent request each UUID is unique.
//
// Implements S3 compatible initiate multipart API.
func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) {
	if !opts.NoAuditLog {
		auditObjectErasureSet(ctx, "NewMultipartUpload", object, &er)
	}

	return er.newMultipartUpload(ctx, bucket, object, opts)
}

// renamePart - renames multipart part to its relevant location under uploadID.
func (er erasureObjects) renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, optsMeta []byte, writeQuorum int) ([]StorageAPI, error) {
	g := errgroup.WithNErrs(len(disks))

	// Rename file on all underlying storage disks.
	for index := range disks {
		index := index
		g.Go(func() error {
			if disks[index] == nil {
				return errDiskNotFound
			}
			return disks[index].RenamePart(ctx, srcBucket, srcEntry, dstBucket, dstEntry, optsMeta)
		}, index)
	}

	// Wait for all renames to finish.
	errs := g.Wait()

	paths := []string{
		dstEntry,
		dstEntry + ".meta",
	}

	err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
	if err != nil {
		er.cleanupMultipartPath(ctx, paths...)
	}

	// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
	// otherwise return failure. Cleanup successful renames.
	return evalDisks(disks, errs), err
}

// PutObjectPart - reads incoming stream and internally erasure codes
// them. This call is similar to single put operation but it is part
// of the multipart transaction.
//
// Implements S3 compatible Upload Part API.
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
	if !opts.NoAuditLog {
		auditObjectErasureSet(ctx, "PutObjectPart", object, &er)
	}

	data := r.Reader
	// Validate input data size and it can never be less than zero.
	if data.Size() < -1 {
		bugLogIf(ctx, errInvalidArgument, logger.ErrorKind)
		return pi, toObjectErr(errInvalidArgument)
	}

	// Read lock for upload id.
	// Only held while reading the upload metadata.
	uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
	rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
	if err != nil {
		return PartInfo{}, err
	}
	rctx := rlkctx.Context()
	defer uploadIDRLock.RUnlock(rlkctx)

	uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
	// Validates if upload ID exists.
	fi, _, err := er.checkUploadIDExists(rctx, bucket, object, uploadID, true)
	if err != nil {
		if errors.Is(err, errVolumeNotFound) {
			return pi, toObjectErr(err, bucket)
		}
		return pi, toObjectErr(err, bucket, object, uploadID)
	}

	// Write lock for this part ID, only hold it if we are planning to read from the
	// streamto avoid any concurrent updates.
	//
	// Must be held throughout this call.
	partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
	plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
	if err != nil {
		return PartInfo{}, err
	}

	ctx = plkctx.Context()
	defer partIDLock.Unlock(plkctx)

	onlineDisks := er.getDisks()
	writeQuorum := fi.WriteQuorum(er.defaultWQuorum())

	if cs := fi.Metadata[hash.MinIOMultipartChecksum]; cs != "" {
		if r.ContentCRCType().String() != cs {
			return pi, InvalidArgument{
				Bucket: bucket,
				Object: fi.Name,
				Err:    fmt.Errorf("checksum missing, want %q, got %q", cs, r.ContentCRCType().String()),
			}
		}
	}
	onlineDisks = shuffleDisks(onlineDisks, fi.Erasure.Distribution)

	// Need a unique name for the part being written in minioMetaBucket to
	// accommodate concurrent PutObjectPart requests

	partSuffix := fmt.Sprintf("part.%d", partID)
	// Random UUID and timestamp for temporary part file.
	tmpPart := fmt.Sprintf("%sx%d", mustGetUUID(), time.Now().UnixNano())
	tmpPartPath := pathJoin(tmpPart, partSuffix)

	// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
	defer func() {
		if countOnlineDisks(onlineDisks) != len(onlineDisks) {
			er.deleteAll(context.Background(), minioMetaTmpBucket, tmpPart)
		}
	}()

	erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
	if err != nil {
		return pi, toObjectErr(err, bucket, object)
	}

	// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
	var buffer []byte
	switch size := data.Size(); {
	case size == 0:
		buffer = make([]byte, 1) // Allocate at least a byte to reach EOF
	case size == -1:
		if size := data.ActualSize(); size > 0 && size < fi.Erasure.BlockSize {
			// Account for padding and forced compression overhead and encryption.
			buffer = make([]byte, data.ActualSize()+256+32+32, data.ActualSize()*2+512)
		} else {
			buffer = globalBytePoolCap.Load().Get()
			defer globalBytePoolCap.Load().Put(buffer)
		}
	case size >= fi.Erasure.BlockSize:
		buffer = globalBytePoolCap.Load().Get()
		defer globalBytePoolCap.Load().Put(buffer)
	case size < fi.Erasure.BlockSize:
		// No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smaller.
		buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
	}

	if len(buffer) > int(fi.Erasure.BlockSize) {
		buffer = buffer[:fi.Erasure.BlockSize]
	}
	writers := make([]io.Writer, len(onlineDisks))
	for i, disk := range onlineDisks {
		if disk == nil {
			continue
		}
		writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
	}

	toEncode := io.Reader(data)
	if data.Size() > bigFileThreshold {
		// Add input readahead.
		// We use 2 buffers, so we always have a full buffer of input.
		pool := globalBytePoolCap.Load()
		bufA := pool.Get()
		bufB := pool.Get()
		defer pool.Put(bufA)
		defer pool.Put(bufB)
		ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
		if err == nil {
			toEncode = ra
			defer ra.Close()
		}
	}

	n, err := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum)
	closeBitrotWriters(writers)
	if err != nil {
		return pi, toObjectErr(err, bucket, object)
	}

	// Should return IncompleteBody{} error when reader has fewer bytes
	// than specified in request header.
	if n < data.Size() {
		return pi, IncompleteBody{Bucket: bucket, Object: object}
	}

	for i := range writers {
		if writers[i] == nil {
			onlineDisks[i] = nil
		}
	}

	// Rename temporary part file to its final location.
	partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)

	md5hex := r.MD5CurrentHexString()
	if opts.PreserveETag != "" {
		md5hex = opts.PreserveETag
	}

	var index []byte
	if opts.IndexCB != nil {
		index = opts.IndexCB()
	}

	partInfo := ObjectPartInfo{
		Number:     partID,
		ETag:       md5hex,
		Size:       n,
		ActualSize: data.ActualSize(),
		ModTime:    UTCNow(),
		Index:      index,
		Checksums:  r.ContentCRC(),
	}

	partFI, err := partInfo.MarshalMsg(nil)
	if err != nil {
		return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
	}

	onlineDisks, err = er.renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, partFI, writeQuorum)
	if err != nil {
		if errors.Is(err, errFileNotFound) {
			// An in-quorum errFileNotFound means that client stream
			// prematurely closed and we do not find any xl.meta or
			// part.1's - in such a scenario we must return as if client
			// disconnected. This means that erasure.Encode() CreateFile()
			// did not do anything.
			return pi, IncompleteBody{Bucket: bucket, Object: object}
		}

		return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
	}

	// Return success.
	return PartInfo{
		PartNumber:     partInfo.Number,
		ETag:           partInfo.ETag,
		LastModified:   partInfo.ModTime,
		Size:           partInfo.Size,
		ActualSize:     partInfo.ActualSize,
		ChecksumCRC32:  partInfo.Checksums["CRC32"],
		ChecksumCRC32C: partInfo.Checksums["CRC32C"],
		ChecksumSHA1:   partInfo.Checksums["SHA1"],
		ChecksumSHA256: partInfo.Checksums["SHA256"],
	}, nil
}

// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used
// by callers to verify object states
// - encrypted
// - compressed
// Does not contain currently uploaded parts by design.
func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
	if !opts.NoAuditLog {
		auditObjectErasureSet(ctx, "GetMultipartInfo", object, &er)
	}

	result := MultipartInfo{
		Bucket:   bucket,
		Object:   object,
		UploadID: uploadID,
	}

	uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
	lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
	if err != nil {
		return MultipartInfo{}, err
	}
	ctx = lkctx.Context()
	defer uploadIDLock.RUnlock(lkctx)

	fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
	if err != nil {
		if errors.Is(err, errVolumeNotFound) {
			return result, toObjectErr(err, bucket)
		}
		return result, toObjectErr(err, bucket, object, uploadID)
	}

	result.UserDefined = cloneMSS(fi.Metadata)
	return result, nil
}

func (er erasureObjects) listParts(ctx context.Context, onlineDisks []StorageAPI, partPath string, readQuorum int) ([]int, error) {
	g := errgroup.WithNErrs(len(onlineDisks))

	objectParts := make([][]string, len(onlineDisks))
	// List uploaded parts from drives.
	for index := range onlineDisks {
		index := index
		g.Go(func() (err error) {
			if onlineDisks[index] == nil {
				return errDiskNotFound
			}
			objectParts[index], err = onlineDisks[index].ListDir(ctx, minioMetaMultipartBucket, minioMetaMultipartBucket, partPath, -1)
			return err
		}, index)
	}

	if err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum); err != nil {
		return nil, err
	}

	partQuorumMap := make(map[int]int)
	for _, driveParts := range objectParts {
		partsWithMetaCount := make(map[int]int, len(driveParts))
		// part files can be either part.N or part.N.meta
		for _, partPath := range driveParts {
			var partNum int
			if _, err := fmt.Sscanf(partPath, "part.%d", &partNum); err == nil {
				partsWithMetaCount[partNum]++
				continue
			}
			if _, err := fmt.Sscanf(partPath, "part.%d.meta", &partNum); err == nil {
				partsWithMetaCount[partNum]++
			}
		}
		// Include only part.N.meta files with corresponding part.N
		for partNum, cnt := range partsWithMetaCount {
			if cnt < 2 {
				continue
			}
			partQuorumMap[partNum]++
		}
	}

	var partNums []int
	for partNum, count := range partQuorumMap {
		if count < readQuorum {
			continue
		}
		partNums = append(partNums, partNum)
	}

	sort.Ints(partNums)
	return partNums, nil
}

// ListObjectParts - lists all previously uploaded parts for a given
// object and uploadID.  Takes additional input of part-number-marker
// to indicate where the listing should begin from.
//
// Implements S3 compatible ListObjectParts API. The resulting
// ListPartsInfo structure is marshaled directly into XML and
// replied back to the client.
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
	if !opts.NoAuditLog {
		auditObjectErasureSet(ctx, "ListObjectParts", object, &er)
	}

	uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
	lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
	if err != nil {
		return ListPartsInfo{}, err
	}
	ctx = lkctx.Context()
	defer uploadIDLock.RUnlock(lkctx)

	fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
	if err != nil {
		return result, toObjectErr(err, bucket, object, uploadID)
	}

	uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
	if partNumberMarker < 0 {
		partNumberMarker = 0
	}

	// Limit output to maxPartsList.
	if maxParts > maxPartsList {
		maxParts = maxPartsList
	}

	// Populate the result stub.
	result.Bucket = bucket
	result.Object = object
	result.UploadID = uploadID
	result.MaxParts = maxParts
	result.PartNumberMarker = partNumberMarker
	result.UserDefined = cloneMSS(fi.Metadata)
	result.ChecksumAlgorithm = fi.Metadata[hash.MinIOMultipartChecksum]

	if maxParts == 0 {
		return result, nil
	}

	onlineDisks := er.getDisks()
	readQuorum := fi.ReadQuorum(er.defaultRQuorum())
	// Read Part info for all parts
	partPath := pathJoin(uploadIDPath, fi.DataDir) + SlashSeparator

	// List parts in quorum
	partNums, err := er.listParts(ctx, onlineDisks, partPath, readQuorum)
	if err != nil {
		// This means that fi.DataDir, is not yet populated so we
		// return an empty response.
		if errors.Is(err, errFileNotFound) {
			return result, nil
		}
		return result, toObjectErr(err, bucket, object, uploadID)
	}

	if len(partNums) == 0 {
		return result, nil
	}

	start := objectPartIndexNums(partNums, partNumberMarker)
	if start != -1 {
		partNums = partNums[start+1:]
	}

	result.Parts = make([]PartInfo, 0, len(partNums))
	partMetaPaths := make([]string, len(partNums))
	for i, part := range partNums {
		partMetaPaths[i] = pathJoin(partPath, fmt.Sprintf("part.%d.meta", part))
	}

	// Read parts in quorum
	objParts, err := readParts(ctx, onlineDisks, minioMetaMultipartBucket, partMetaPaths,
		partNums, readQuorum)
	if err != nil {
		return result, toObjectErr(err, bucket, object, uploadID)
	}

	count := maxParts
	for _, objPart := range objParts {
		result.Parts = append(result.Parts, PartInfo{
			PartNumber:     objPart.Number,
			LastModified:   objPart.ModTime,
			ETag:           objPart.ETag,
			Size:           objPart.Size,
			ActualSize:     objPart.ActualSize,
			ChecksumCRC32:  objPart.Checksums["CRC32"],
			ChecksumCRC32C: objPart.Checksums["CRC32C"],
			ChecksumSHA1:   objPart.Checksums["SHA1"],
			ChecksumSHA256: objPart.Checksums["SHA256"],
		})
		count--
		if count == 0 {
			break
		}
	}

	if len(objParts) > len(result.Parts) {
		result.IsTruncated = true
		// Make sure to fill next part number marker if IsTruncated is true for subsequent listing.
		result.NextPartNumberMarker = result.Parts[len(result.Parts)-1].PartNumber
	}

	return result, nil
}

func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaPaths []string, partNumbers []int, readQuorum int) ([]ObjectPartInfo, error) {
	g := errgroup.WithNErrs(len(disks))

	objectPartInfos := make([][]*ObjectPartInfo, len(disks))
	// Rename file on all underlying storage disks.
	for index := range disks {
		index := index
		g.Go(func() (err error) {
			if disks[index] == nil {
				return errDiskNotFound
			}
			objectPartInfos[index], err = disks[index].ReadParts(ctx, bucket, partMetaPaths...)
			return err
		}, index)
	}

	if err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum); err != nil {
		return nil, err
	}

	partInfosInQuorum := make([]ObjectPartInfo, len(partMetaPaths))
	for pidx := range partMetaPaths {
		// partMetaQuorumMap uses
		//  - path/to/part.N as key to collate errors from failed drives.
		//  - part ETag to collate part metadata
		partMetaQuorumMap := make(map[string]int, len(partNumbers))
		var pinfos []*ObjectPartInfo
		for idx := range disks {
			if len(objectPartInfos[idx]) != len(partMetaPaths) {
				partMetaQuorumMap[partMetaPaths[pidx]]++
				continue
			}

			pinfo := objectPartInfos[idx][pidx]
			if pinfo != nil && pinfo.ETag != "" {
				pinfos = append(pinfos, pinfo)
				partMetaQuorumMap[pinfo.ETag]++
				continue
			}
			partMetaQuorumMap[partMetaPaths[pidx]]++
		}

		var maxQuorum int
		var maxETag string
		var maxPartMeta string
		for etag, quorum := range partMetaQuorumMap {
			if maxQuorum < quorum {
				maxQuorum = quorum
				maxETag = etag
				maxPartMeta = etag
			}
		}
		// found is a representative ObjectPartInfo which either has the maximally occurring ETag or an error.
		var found *ObjectPartInfo
		for _, pinfo := range pinfos {
			if pinfo == nil {
				continue
			}
			if maxETag != "" && pinfo.ETag == maxETag {
				found = pinfo
				break
			}
			if pinfo.ETag == "" && maxPartMeta != "" && path.Base(maxPartMeta) == fmt.Sprintf("part.%d.meta", pinfo.Number) {
				found = pinfo
				break
			}
		}

		if found != nil && found.ETag != "" && partMetaQuorumMap[maxETag] >= readQuorum {
			partInfosInQuorum[pidx] = *found
			continue
		}
		partInfosInQuorum[pidx] = ObjectPartInfo{
			Number: partNumbers[pidx],
			Error: InvalidPart{
				PartNumber: partNumbers[pidx],
			}.Error(),
		}

	}
	return partInfosInQuorum, nil
}

func objPartToPartErr(part ObjectPartInfo) error {
	if strings.Contains(part.Error, "file not found") {
		return InvalidPart{PartNumber: part.Number}
	}
	if strings.Contains(part.Error, "Specified part could not be found") {
		return InvalidPart{PartNumber: part.Number}
	}
	if strings.Contains(part.Error, errErasureReadQuorum.Error()) {
		return errErasureReadQuorum
	}
	return errors.New(part.Error)
}

// CompleteMultipartUpload - completes an ongoing multipart
// transaction after receiving all the parts indicated by the client.
// Returns an md5sum calculated by concatenating all the individual
// md5sums of all the parts.
//
// Implements S3 compatible Complete multipart API.
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
	if !opts.NoAuditLog {
		auditObjectErasureSet(ctx, "CompleteMultipartUpload", object, &er)
	}

	if opts.CheckPrecondFn != nil {
		if !opts.NoLock {
			ns := er.NewNSLock(bucket, object)
			lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
			if err != nil {
				return ObjectInfo{}, err
			}
			ctx = lkctx.Context()
			defer ns.Unlock(lkctx)
			opts.NoLock = true
		}

		obj, err := er.getObjectInfo(ctx, bucket, object, opts)
		if err == nil && opts.CheckPrecondFn(obj) {
			return ObjectInfo{}, PreConditionFailed{}
		}
		if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) && !isErrReadQuorum(err) {
			return ObjectInfo{}, err
		}
	}

	// Hold write locks to verify uploaded parts, also disallows any
	// parallel PutObjectPart() requests.
	uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
	wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout)
	if err != nil {
		return oi, err
	}
	ctx = wlkctx.Context()
	defer uploadIDLock.Unlock(wlkctx)

	fi, partsMetadata, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true)
	if err != nil {
		if errors.Is(err, errVolumeNotFound) {
			return oi, toObjectErr(err, bucket)
		}
		return oi, toObjectErr(err, bucket, object, uploadID)
	}

	uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
	onlineDisks := er.getDisks()
	writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
	readQuorum := fi.ReadQuorum(er.defaultRQuorum())

	// Read Part info for all parts
	partPath := pathJoin(uploadIDPath, fi.DataDir) + SlashSeparator
	partMetaPaths := make([]string, len(parts))
	partNumbers := make([]int, len(parts))
	for idx, part := range parts {
		partMetaPaths[idx] = pathJoin(partPath, fmt.Sprintf("part.%d.meta", part.PartNumber))
		partNumbers[idx] = part.PartNumber
	}

	partInfoFiles, err := readParts(ctx, onlineDisks, minioMetaMultipartBucket, partMetaPaths, partNumbers, readQuorum)
	if err != nil {
		return oi, err
	}

	if len(partInfoFiles) != len(parts) {
		// Should only happen through internal error
		err := fmt.Errorf("unexpected part result count: %d, want %d", len(partInfoFiles), len(parts))
		bugLogIf(ctx, err)
		return oi, toObjectErr(err, bucket, object)
	}

	// Checksum type set when upload started.
	var checksumType hash.ChecksumType
	if cs := fi.Metadata[hash.MinIOMultipartChecksum]; cs != "" {
		checksumType = hash.NewChecksumType(cs)
		if opts.WantChecksum != nil && !opts.WantChecksum.Type.Is(checksumType) {
			return oi, InvalidArgument{
				Bucket: bucket,
				Object: fi.Name,
				Err:    fmt.Errorf("checksum type mismatch"),
			}
		}
	}

	var checksumCombined []byte

	// However, in case of encryption, the persisted part ETags don't match
	// what we have sent to the client during PutObjectPart. The reason is
	// that ETags are encrypted. Hence, the client will send a list of complete
	// part ETags of which non can match the ETag of any part. For example
	//   ETag (client):          30902184f4e62dd8f98f0aaff810c626
	//   ETag (server-internal): 20000f00ce5dc16e3f3b124f586ae1d88e9caa1c598415c2759bbb50e84a59f630902184f4e62dd8f98f0aaff810c626
	//
	// Therefore, we adjust all ETags sent by the client to match what is stored
	// on the backend.
	kind, _ := crypto.IsEncrypted(fi.Metadata)

	var objectEncryptionKey []byte
	switch kind {
	case crypto.SSEC:
		if checksumType.IsSet() {
			if opts.EncryptFn == nil {
				return oi, crypto.ErrMissingCustomerKey
			}
			baseKey := opts.EncryptFn("", nil)
			if len(baseKey) != 32 {
				return oi, crypto.ErrInvalidCustomerKey
			}
			objectEncryptionKey, err = decryptObjectMeta(baseKey, bucket, object, fi.Metadata)
			if err != nil {
				return oi, err
			}
		}
	case crypto.S3, crypto.S3KMS:
		objectEncryptionKey, err = decryptObjectMeta(nil, bucket, object, fi.Metadata)
		if err != nil {
			return oi, err
		}
	}
	if len(objectEncryptionKey) == 32 {
		var key crypto.ObjectKey
		copy(key[:], objectEncryptionKey)
		opts.EncryptFn = metadataEncrypter(key)
	}

	for idx, part := range partInfoFiles {
		if part.Error != "" {
			err = objPartToPartErr(part)
			bugLogIf(ctx, err)
			return oi, err
		}

		if parts[idx].PartNumber != part.Number {
			internalLogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", parts[idx].PartNumber, parts[idx].PartNumber, part.Number))
			return oi, InvalidPart{
				PartNumber: part.Number,
			}
		}

		// Add the current part.
		fi.AddObjectPart(part.Number, part.ETag, part.Size, part.ActualSize, part.ModTime, part.Index, part.Checksums)
	}

	// Calculate full object size.
	var objectSize int64

	// Calculate consolidated actual size.
	var objectActualSize int64

	// Order online disks in accordance with distribution order.
	// Order parts metadata in accordance with distribution order.
	onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi)

	// Save current erasure metadata for validation.
	currentFI := fi

	// Allocate parts similar to incoming slice.
	fi.Parts = make([]ObjectPartInfo, len(parts))

	// Validate each part and then commit to disk.
	for i, part := range parts {
		partIdx := objectPartIndex(currentFI.Parts, part.PartNumber)
		// All parts should have same part number.
		if partIdx == -1 {
			invp := InvalidPart{
				PartNumber: part.PartNumber,
				GotETag:    part.ETag,
			}
			return oi, invp
		}
		expPart := currentFI.Parts[partIdx]

		// ensure that part ETag is canonicalized to strip off extraneous quotes
		part.ETag = canonicalizeETag(part.ETag)
		expETag := tryDecryptETag(objectEncryptionKey, expPart.ETag, kind == crypto.S3)
		if expETag != part.ETag {
			invp := InvalidPart{
				PartNumber: part.PartNumber,
				ExpETag:    expETag,
				GotETag:    part.ETag,
			}
			return oi, invp
		}

		if checksumType.IsSet() {
			crc := expPart.Checksums[checksumType.String()]
			if crc == "" {
				return oi, InvalidPart{
					PartNumber: part.PartNumber,
				}
			}
			wantCS := map[string]string{
				hash.ChecksumCRC32.String():  part.ChecksumCRC32,
				hash.ChecksumCRC32C.String(): part.ChecksumCRC32C,
				hash.ChecksumSHA1.String():   part.ChecksumSHA1,
				hash.ChecksumSHA256.String(): part.ChecksumSHA256,
			}
			if wantCS[checksumType.String()] != crc {
				return oi, InvalidPart{
					PartNumber: part.PartNumber,
					ExpETag:    wantCS[checksumType.String()],
					GotETag:    crc,
				}
			}
			cs := hash.NewChecksumString(checksumType.String(), crc)
			if !cs.Valid() {
				return oi, InvalidPart{
					PartNumber: part.PartNumber,
				}
			}
			checksumCombined = append(checksumCombined, cs.Raw...)
		}

		// All parts except the last part has to be at least 5MB.
		if (i < len(parts)-1) && !isMinAllowedPartSize(currentFI.Parts[partIdx].ActualSize) {
			return oi, PartTooSmall{
				PartNumber: part.PartNumber,
				PartSize:   expPart.ActualSize,
				PartETag:   part.ETag,
			}
		}

		// Save for total object size.
		objectSize += expPart.Size

		// Save the consolidated actual size.
		objectActualSize += expPart.ActualSize

		// Add incoming parts.
		fi.Parts[i] = ObjectPartInfo{
			Number:     part.PartNumber,
			Size:       expPart.Size,
			ActualSize: expPart.ActualSize,
			ModTime:    expPart.ModTime,
			Index:      expPart.Index,
			Checksums:  nil, // Not transferred since we do not need it.
		}
	}

	if opts.WantChecksum != nil {
		err := opts.WantChecksum.Matches(checksumCombined, len(parts))
		if err != nil {
			return oi, err
		}
	}

	if !opts.NoLock {
		lk := er.NewNSLock(bucket, object)
		lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
		if err != nil {
			return ObjectInfo{}, err
		}
		ctx = lkctx.Context()
		defer lk.Unlock(lkctx)
	}

	// Accept encrypted checksum from incoming request.
	if opts.UserDefined[ReplicationSsecChecksumHeader] != "" {
		if v, err := base64.StdEncoding.DecodeString(opts.UserDefined[ReplicationSsecChecksumHeader]); err == nil {
			fi.Checksum = v
		}
		delete(opts.UserDefined, ReplicationSsecChecksumHeader)
	}

	if checksumType.IsSet() {
		checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart
		var cs *hash.Checksum
		cs = hash.NewChecksumFromData(checksumType, checksumCombined)
		fi.Checksum = cs.AppendTo(nil, checksumCombined)
		if opts.EncryptFn != nil {
			fi.Checksum = opts.EncryptFn("object-checksum", fi.Checksum)
		}
	}
	delete(fi.Metadata, hash.MinIOMultipartChecksum) // Not needed in final object.

	// Save the final object size and modtime.
	fi.Size = objectSize
	fi.ModTime = opts.MTime
	if opts.MTime.IsZero() {
		fi.ModTime = UTCNow()
	}

	// Save successfully calculated md5sum.
	// for replica, newMultipartUpload would have already sent the replication ETag
	if fi.Metadata["etag"] == "" {
		if opts.UserDefined["etag"] != "" {
			fi.Metadata["etag"] = opts.UserDefined["etag"]
		} else { // fallback if not already calculated in handler.
			fi.Metadata["etag"] = getCompleteMultipartMD5(parts)
		}
	}

	// Save the consolidated actual size.
	if opts.ReplicationRequest {
		if v := opts.UserDefined[ReservedMetadataPrefix+"Actual-Object-Size"]; v != "" {
			fi.Metadata[ReservedMetadataPrefix+"actual-size"] = v
		}
	} else {
		fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
	}

	if opts.DataMovement {
		fi.SetDataMov()
	}

	// Update all erasure metadata, make sure to not modify fields like
	// checksum which are different on each disks.
	for index := range partsMetadata {
		if partsMetadata[index].IsValid() {
			partsMetadata[index].Size = fi.Size
			partsMetadata[index].ModTime = fi.ModTime
			partsMetadata[index].Metadata = fi.Metadata
			partsMetadata[index].Parts = fi.Parts
			partsMetadata[index].Checksum = fi.Checksum
			partsMetadata[index].Versioned = opts.Versioned || opts.VersionSuspended
		}
	}

	paths := make([]string, 0, len(currentFI.Parts))
	// Remove parts that weren't present in CompleteMultipartUpload request.
	for _, curpart := range currentFI.Parts {
		paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d.meta", curpart.Number)))

		if objectPartIndex(fi.Parts, curpart.Number) == -1 {
			// Delete the missing part files. e.g,
			// Request 1: NewMultipart
			// Request 2: PutObjectPart 1
			// Request 3: PutObjectPart 2
			// Request 4: CompleteMultipartUpload --part 2
			// N.B. 1st part is not present. This part should be removed from the storage.
			paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d", curpart.Number)))
		}
	}

	er.cleanupMultipartPath(ctx, paths...) // cleanup all part.N.meta, and skipped part.N's before final rename().

	defer func() {
		if err == nil {
			er.deleteAll(context.Background(), minioMetaMultipartBucket, uploadIDPath)
		}
	}()

	// Rename the multipart object to final location.
	onlineDisks, versions, oldDataDir, err := renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
		partsMetadata, bucket, object, writeQuorum)
	if err != nil {
		return oi, toObjectErr(err, bucket, object, uploadID)
	}

	if err = er.commitRenameDataDir(ctx, bucket, object, oldDataDir, onlineDisks, writeQuorum); err != nil {
		return ObjectInfo{}, toObjectErr(err, bucket, object, uploadID)
	}

	if !opts.Speedtest && len(versions) > 0 {
		globalMRFState.addPartialOp(PartialOperation{
			Bucket:    bucket,
			Object:    object,
			Queued:    time.Now(),
			Versions:  versions,
			SetIndex:  er.setIndex,
			PoolIndex: er.poolIndex,
		})
	}

	if !opts.Speedtest && len(versions) == 0 {
		// Check if there is any offline disk and add it to the MRF list
		for _, disk := range onlineDisks {
			if disk != nil && disk.IsOnline() {
				continue
			}
			er.addPartial(bucket, object, fi.VersionID)
			break
		}
	}

	for i := 0; i < len(onlineDisks); i++ {
		if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
			// Object info is the same in all disks, so we can pick
			// the first meta from online disk
			fi = partsMetadata[i]
			break
		}
	}

	// we are adding a new version to this object under the namespace lock, so this is the latest version.
	fi.IsLatest = true

	// Success, return object info.
	return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}

// AbortMultipartUpload - aborts an ongoing multipart operation
// signified by the input uploadID. This is an atomic operation
// doesn't require clients to initiate multiple such requests.
//
// All parts are purged from all disks and reference to the uploadID
// would be removed from the system, rollback is not possible on this
// operation.
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
	if !opts.NoAuditLog {
		auditObjectErasureSet(ctx, "AbortMultipartUpload", object, &er)
	}

	lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
	lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
	if err != nil {
		return err
	}
	ctx = lkctx.Context()
	defer lk.Unlock(lkctx)

	// Validates if upload ID exists.
	if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil {
		if errors.Is(err, errVolumeNotFound) {
			return toObjectErr(err, bucket)
		}
		return toObjectErr(err, bucket, object, uploadID)
	}

	// Cleanup all uploaded parts.
	er.deleteAll(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID))

	// Successfully purged.
	return nil
}